This is an automated email from the ASF dual-hosted git repository. briansolo1985 pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new 903090b649 NIFI-13436 Retain queue for non-modified connections during MiNiFi flow update 903090b649 is described below commit 903090b64956a7afbc9de37c80a64166880483e6 Author: Ferenc Erdei <erdei.feren...@gmail.com> AuthorDate: Mon Jun 24 15:03:02 2024 +0200 NIFI-13436 Retain queue for non-modified connections during MiNiFi flow update This closes #8998. Signed-off-by: Ferenc Kis <briansolo1...@gmail.com> --- .../DefaultUpdateConfigurationStrategy.java | 110 ++++++++++++++------- 1 file changed, 72 insertions(+), 38 deletions(-) diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategy.java b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategy.java index 678ab912b7..d5ccac1e53 100644 --- a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategy.java +++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategy.java @@ -17,41 +17,18 @@ package org.apache.nifi.minifi.c2.command; -import static java.nio.charset.StandardCharsets.UTF_8; -import static java.util.UUID.randomUUID; -import static java.util.function.Predicate.not; -import static org.apache.nifi.components.AsyncLoadedProcessor.LoadState.DOWNLOADING_DEPENDENCIES; -import static org.apache.nifi.components.AsyncLoadedProcessor.LoadState.INITIALIZING_ENVIRONMENT; -import static org.apache.nifi.components.AsyncLoadedProcessor.LoadState.LOADING_PROCESSOR_CODE; -import static org.apache.nifi.components.validation.ValidationStatus.INVALID; -import static org.apache.nifi.components.validation.ValidationStatus.VALIDATING; -import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.BACKUP_EXTENSION; -import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.RAW_EXTENSION; -import static org.apache.nifi.minifi.commons.util.FlowUpdateUtils.backup; -import static org.apache.nifi.minifi.commons.util.FlowUpdateUtils.persist; -import static org.apache.nifi.minifi.commons.util.FlowUpdateUtils.removeIfExists; -import static org.apache.nifi.minifi.commons.util.FlowUpdateUtils.revert; - -import java.io.IOException; -import java.nio.file.Path; -import java.util.Collection; -import java.util.List; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.function.Predicate; -import java.util.function.Supplier; -import java.util.stream.Stream; import org.apache.commons.io.FilenameUtils; import org.apache.nifi.c2.client.service.operation.UpdateConfigurationStrategy; import org.apache.nifi.components.AsyncLoadedProcessor; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.connectable.Connection; import org.apache.nifi.controller.ComponentNode; import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.flow.FlowManager; import org.apache.nifi.controller.flow.VersionedDataflow; +import org.apache.nifi.flow.VersionedConnection; +import org.apache.nifi.flow.VersionedProcessGroup; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.minifi.commons.service.FlowEnrichService; @@ -61,6 +38,36 @@ import org.apache.nifi.services.FlowService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.nio.file.Path; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.Collections.emptySet; +import static java.util.UUID.randomUUID; +import static java.util.function.Predicate.not; +import static org.apache.nifi.components.AsyncLoadedProcessor.LoadState.DOWNLOADING_DEPENDENCIES; +import static org.apache.nifi.components.AsyncLoadedProcessor.LoadState.INITIALIZING_ENVIRONMENT; +import static org.apache.nifi.components.AsyncLoadedProcessor.LoadState.LOADING_PROCESSOR_CODE; +import static org.apache.nifi.components.validation.ValidationStatus.INVALID; +import static org.apache.nifi.components.validation.ValidationStatus.VALIDATING; +import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.BACKUP_EXTENSION; +import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.RAW_EXTENSION; +import static org.apache.nifi.minifi.commons.util.FlowUpdateUtils.backup; +import static org.apache.nifi.minifi.commons.util.FlowUpdateUtils.persist; +import static org.apache.nifi.minifi.commons.util.FlowUpdateUtils.removeIfExists; +import static org.apache.nifi.minifi.commons.util.FlowUpdateUtils.revert; + public class DefaultUpdateConfigurationStrategy implements UpdateConfigurationStrategy { private static final Logger LOGGER = LoggerFactory.getLogger(DefaultUpdateConfigurationStrategy.class); @@ -105,8 +112,12 @@ public class DefaultUpdateConfigurationStrategy implements UpdateConfigurationSt if (LOGGER.isDebugEnabled()) { LOGGER.debug("Attempting to update flow with content: \n{}", new String(rawFlow, UTF_8)); } - + Set<String> originalConnectionIds = emptySet(); try { + originalConnectionIds = findAllExistingConnections(flowController.getFlowManager().getRootGroup()) + .stream() + .map(Connection::getIdentifier) + .collect(Collectors.toSet()); VersionedDataflow rawDataFlow = flowSerDeService.deserialize(rawFlow); VersionedDataflow propertyEncryptedRawDataFlow = flowPropertyEncryptor.encryptSensitiveProperties(rawDataFlow); @@ -120,7 +131,7 @@ public class DefaultUpdateConfigurationStrategy implements UpdateConfigurationSt persist(serializedPropertyEncryptedRawDataFlow, rawFlowConfigurationFile, false); persist(serializedEnrichedFlowCandidate, flowConfigurationFile, true); - reloadFlow(); + reloadFlow(findAllProposedConnectionIds(enrichedFlowCandidate.getRootGroup())); return true; } catch (IllegalStateException e) { @@ -128,7 +139,7 @@ public class DefaultUpdateConfigurationStrategy implements UpdateConfigurationSt revert(backupFlowConfigurationFile, flowConfigurationFile); revert(backupRawFlowConfigurationFile, rawFlowConfigurationFile); try { - reloadFlow(); + reloadFlow(originalConnectionIds); } catch (IOException ex) { LOGGER.error("Unable to reload the reverted flow", e); } @@ -144,9 +155,9 @@ public class DefaultUpdateConfigurationStrategy implements UpdateConfigurationSt } } - private void reloadFlow() throws IOException { + private void reloadFlow(Set<String> proposedConnectionIds) throws IOException { LOGGER.info("Initiating flow reload"); - stopFlowGracefully(flowController.getFlowManager().getRootGroup()); + stopFlowGracefully(flowController.getFlowManager().getRootGroup(), proposedConnectionIds); flowService.load(null); flowController.onFlowInitialized(true); @@ -161,7 +172,7 @@ public class DefaultUpdateConfigurationStrategy implements UpdateConfigurationSt LOGGER.info("Flow has been reloaded successfully"); } - private void stopFlowGracefully(ProcessGroup rootGroup) { + private void stopFlowGracefully(ProcessGroup rootGroup, Set<String> proposedConnectionIds) { LOGGER.info("Stopping flow gracefully"); Optional<ProcessGroup> drainResult = stopSourceProcessorsAndWaitFlowToDrain(rootGroup); @@ -169,12 +180,17 @@ public class DefaultUpdateConfigurationStrategy implements UpdateConfigurationSt rootGroup.getRemoteProcessGroups().stream() .map(RemoteProcessGroup::stopTransmitting) .forEach(this::waitForStopOrLogTimeOut); - drainResult.ifPresentOrElse( - rootProcessGroup -> { - LOGGER.warn("Flow did not stop within graceful period. Force stopping flow and emptying queues"); - rootProcessGroup.dropAllFlowFiles(randomUUID().toString(), randomUUID().toString()); - }, - () -> LOGGER.info("Flow has been stopped gracefully")); + drainResult.ifPresentOrElse(emptyQueuesForNonReferencedQueues(proposedConnectionIds), () -> LOGGER.info("Flow has been stopped gracefully")); + } + + private Consumer<ProcessGroup> emptyQueuesForNonReferencedQueues(Set<String> proposedConnectionIds) { + return rootProcessGroup -> { + LOGGER.warn("Flow did not stop within graceful period. Force stopping flow and emptying non referenced queues"); + findAllExistingConnections(rootProcessGroup).stream() + .filter(connection -> !proposedConnectionIds.contains(connection.getIdentifier())) + .map(Connection::getFlowFileQueue) + .forEach(queue -> queue.dropFlowFiles(randomUUID().toString(), randomUUID().toString())); + }; } private Optional<ProcessGroup> stopSourceProcessorsAndWaitFlowToDrain(ProcessGroup rootGroup) { @@ -262,4 +278,22 @@ public class DefaultUpdateConfigurationStrategy implements UpdateConfigurationSt } } } + + private Set<String> findAllProposedConnectionIds(VersionedProcessGroup versionedProcessGroup) { + return versionedProcessGroup == null + ? emptySet() + : Stream.concat( + versionedProcessGroup.getConnections().stream().map(VersionedConnection::getInstanceIdentifier), + versionedProcessGroup.getProcessGroups().stream().map(this::findAllProposedConnectionIds).flatMap(Set::stream) + ).collect(Collectors.toSet()); + } + + private Set<Connection> findAllExistingConnections(ProcessGroup processGroup) { + return processGroup == null + ? emptySet() + : Stream.concat( + processGroup.getConnections().stream(), + processGroup.getProcessGroups().stream().map(this::findAllExistingConnections).flatMap(Set::stream) + ).collect(Collectors.toSet()); + } }