This is an automated email from the ASF dual-hosted git repository. bejancsaba pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new bc589e273d NIFI-13020 Fix legacy flow C2 update issue bc589e273d is described below commit bc589e273d4e29dad8837a64895f60fb4a56173b Author: Ferenc Kis <briansolo1...@gmail.com> AuthorDate: Tue Apr 9 12:14:54 2024 +0200 NIFI-13020 Fix legacy flow C2 update issue Signed-off-by: Csaba Bejan <bejan.cs...@gmail.com> This closes #8624 --- .../commons/service/StandardFlowEnrichService.java | 112 ++++++++++++++++++++- 1 file changed, 108 insertions(+), 4 deletions(-) diff --git a/minifi/minifi-commons/minifi-commons-framework/src/main/java/org/apache/nifi/minifi/commons/service/StandardFlowEnrichService.java b/minifi/minifi-commons/minifi-commons-framework/src/main/java/org/apache/nifi/minifi/commons/service/StandardFlowEnrichService.java index 6a73992d15..95cc77abec 100644 --- a/minifi/minifi-commons/minifi-commons-framework/src/main/java/org/apache/nifi/minifi/commons/service/StandardFlowEnrichService.java +++ b/minifi/minifi-commons/minifi-commons-framework/src/main/java/org/apache/nifi/minifi/commons/service/StandardFlowEnrichService.java @@ -20,26 +20,37 @@ package org.apache.nifi.minifi.commons.service; import static java.lang.Boolean.FALSE; import static java.lang.Boolean.parseBoolean; import static java.util.Map.entry; +import static java.util.Objects.isNull; import static java.util.Optional.empty; import static java.util.Optional.ofNullable; import static java.util.UUID.randomUUID; import static java.util.stream.Collectors.toMap; +import static java.util.stream.Collectors.toSet; import static org.apache.commons.lang3.StringUtils.EMPTY; +import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.nifi.flow.ScheduledState.ENABLED; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Optional; +import java.util.Set; +import java.util.function.Predicate; +import java.util.stream.Stream; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.controller.flow.VersionedDataflow; import org.apache.nifi.flow.Bundle; import org.apache.nifi.flow.ComponentType; +import org.apache.nifi.flow.ConnectableComponent; import org.apache.nifi.flow.ControllerServiceAPI; +import org.apache.nifi.flow.Position; import org.apache.nifi.flow.ScheduledState; import org.apache.nifi.flow.VersionedComponent; import org.apache.nifi.flow.VersionedControllerService; import org.apache.nifi.flow.VersionedProcessGroup; +import org.apache.nifi.flow.VersionedProcessor; +import org.apache.nifi.flow.VersionedRemoteProcessGroup; import org.apache.nifi.flow.VersionedReportingTask; import org.apache.nifi.logging.LogLevel; import org.apache.nifi.minifi.commons.api.MiNiFiProperties; @@ -68,6 +79,10 @@ public class StandardFlowEnrichService implements FlowEnrichService { private static final String SITE_TO_SITE_REPORTING_NAR_ARTIFACT = "nifi-site-to-site-reporting-nar"; private static final String PROVENANCE_REPORTING_TASK_PROTOCOL = "HTTP"; private static final String PROVENANCE_REPORTING_TASK_BEGINNING_OF_STREAM = "beginning-of-stream"; + private static final String DEFAULT_BULLETIN_LEVEL = "WARN"; + private static final String DEFAULT_EXECUTION_NODE = "ALL"; + private static final Position DEFAULT_POSITION = new Position(0.0, 0.0); + private static final Predicate<? super VersionedComponent> IS_LEGACY_COMPONENT = versionedComponent -> isBlank(versionedComponent.getInstanceIdentifier()); private final ReadableProperties minifiProperties; @@ -86,11 +101,11 @@ public class StandardFlowEnrichService implements FlowEnrichService { maxConcurrentThreads.ifPresent(versionedDataflow::setMaxTimerDrivenThreadCount); VersionedProcessGroup rootGroup = versionedDataflow.getRootGroup(); - if (rootGroup.getIdentifier() == null) { + if (isBlank(rootGroup.getIdentifier())) { rootGroup.setIdentifier(randomUUID().toString()); } - if (rootGroup.getInstanceIdentifier() == null) { - rootGroup.setInstanceIdentifier(randomUUID().toString()); + if (isNull(rootGroup.getPosition())) { + rootGroup.setPosition(DEFAULT_POSITION); } rootGroup.getControllerServices().forEach(controllerService -> controllerService.setScheduledState(ENABLED)); @@ -106,6 +121,13 @@ public class StandardFlowEnrichService implements FlowEnrichService { createProvenanceReportingTask(commonSslControllerService.map(VersionedComponent::getInstanceIdentifier).orElse(EMPTY)) .ifPresent(versionedDataflow.getReportingTasks()::add); + if (IS_LEGACY_COMPONENT.test(rootGroup)) { + LOG.info("Legacy flow detected. Initializing missing but mandatory properties on components"); + initializeComponentsMissingProperties(rootGroup); + Map<String, String> idToInstanceIdMap = createIdToInstanceIdMap(rootGroup); + setConnectableComponentsInstanceId(rootGroup, idToInstanceIdMap); + } + return versionedDataflow; } @@ -225,6 +247,88 @@ public class StandardFlowEnrichService implements FlowEnrichService { entry("SSL Context Service", sslControllerServiceIdentifier)) .stream() .filter(entry -> StringUtils.isNotBlank(entry.getValue())) - .collect(toMap(Map.Entry::getKey, Map.Entry::getValue)); + .collect(toMap(Entry::getKey, Entry::getValue)); + } + + private void initializeComponentsMissingProperties(VersionedProcessGroup versionedProcessGroup) { + versionedProcessGroup.setInstanceIdentifier(randomUUID().toString()); + + Stream.of( + ofNullable(versionedProcessGroup.getControllerServices()).orElse(Set.of()), + ofNullable(versionedProcessGroup.getConnections()).orElse(Set.of()), + ofNullable(versionedProcessGroup.getProcessors()).orElse(Set.of()), + ofNullable(versionedProcessGroup.getInputPorts()).orElse(Set.of()), + ofNullable(versionedProcessGroup.getOutputPorts()).orElse(Set.of()), + ofNullable(versionedProcessGroup.getFunnels()).orElse(Set.of()), + ofNullable(versionedProcessGroup.getRemoteProcessGroups()).orElse(Set.of()), + ofNullable(versionedProcessGroup.getRemoteProcessGroups()).orElse(Set.of()) + .stream() + .map(VersionedRemoteProcessGroup::getInputPorts) + .flatMap(Set::stream) + .collect(toSet()), + ofNullable(versionedProcessGroup.getRemoteProcessGroups()).orElse(Set.of()) + .stream() + .map(VersionedRemoteProcessGroup::getOutputPorts) + .flatMap(Set::stream) + .collect(toSet())) + .flatMap(Set::stream) + .filter(IS_LEGACY_COMPONENT) + .forEach(versionedComponent -> { + versionedComponent.setInstanceIdentifier(randomUUID().toString()); + if (versionedComponent instanceof VersionedProcessor processor) { + if (isBlank(processor.getBulletinLevel())) { + processor.setBulletinLevel(DEFAULT_BULLETIN_LEVEL); + } + if (isBlank(processor.getExecutionNode())) { + processor.setExecutionNode(DEFAULT_EXECUTION_NODE); + } + } + }); + + versionedProcessGroup.getProcessGroups().forEach(this::initializeComponentsMissingProperties); + } + + private Map<String, String> createIdToInstanceIdMap(VersionedProcessGroup versionedProcessGroup) { + Map<String, String> thisProcessGroupIdToInstanceIdMaps = Stream.of( + ofNullable(versionedProcessGroup.getProcessors()).orElse(Set.of()), + ofNullable(versionedProcessGroup.getInputPorts()).orElse(Set.of()), + ofNullable(versionedProcessGroup.getOutputPorts()).orElse(Set.of()), + ofNullable(versionedProcessGroup.getFunnels()).orElse(Set.of()), + ofNullable(versionedProcessGroup.getRemoteProcessGroups()).orElse(Set.of()) + .stream() + .map(VersionedRemoteProcessGroup::getInputPorts) + .flatMap(Set::stream) + .collect(toSet()), + ofNullable(versionedProcessGroup.getRemoteProcessGroups()).orElse(Set.of()) + .stream() + .map(VersionedRemoteProcessGroup::getOutputPorts) + .flatMap(Set::stream) + .collect(toSet()) + ) + .flatMap(Set::stream) + .collect(toMap(VersionedComponent::getIdentifier, VersionedComponent::getInstanceIdentifier)); + + Stream<Map<String, String>> childProcessGroupsIdToInstanceIdMaps = ofNullable(versionedProcessGroup.getProcessGroups()).orElse(Set.of()) + .stream() + .map(this::createIdToInstanceIdMap); + + return Stream.concat( + Stream.of(thisProcessGroupIdToInstanceIdMaps), + childProcessGroupsIdToInstanceIdMaps) + .map(Map::entrySet) + .flatMap(Set::stream) + .collect(toMap(Entry::getKey, Entry::getValue)); + } + + private void setConnectableComponentsInstanceId(VersionedProcessGroup versionedProcessGroup, Map<String, String> idToInstanceIdMap) { + ofNullable(versionedProcessGroup.getConnections()).orElse(Set.of()) + .forEach(connection -> { + ConnectableComponent source = connection.getSource(); + source.setInstanceIdentifier(idToInstanceIdMap.get(source.getId())); + ConnectableComponent destination = connection.getDestination(); + destination.setInstanceIdentifier(idToInstanceIdMap.get(destination.getId())); + }); + ofNullable(versionedProcessGroup.getProcessGroups()).orElse(Set.of()) + .forEach(childProcessGroup -> setConnectableComponentsInstanceId(childProcessGroup, idToInstanceIdMap)); } }