This is an automated email from the ASF dual-hosted git repository.

briansolo1985 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 903090b649 NIFI-13436 Retain queue for non-modified connections during 
MiNiFi flow update
903090b649 is described below

commit 903090b64956a7afbc9de37c80a64166880483e6
Author: Ferenc Erdei <erdei.feren...@gmail.com>
AuthorDate: Mon Jun 24 15:03:02 2024 +0200

    NIFI-13436 Retain queue for non-modified connections during MiNiFi flow 
update
    
    This closes #8998.
    
    Signed-off-by: Ferenc Kis <briansolo1...@gmail.com>
---
 .../DefaultUpdateConfigurationStrategy.java        | 110 ++++++++++++++-------
 1 file changed, 72 insertions(+), 38 deletions(-)

diff --git 
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategy.java
 
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategy.java
index 678ab912b7..d5ccac1e53 100644
--- 
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategy.java
+++ 
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategy.java
@@ -17,41 +17,18 @@
 
 package org.apache.nifi.minifi.c2.command;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static java.util.UUID.randomUUID;
-import static java.util.function.Predicate.not;
-import static 
org.apache.nifi.components.AsyncLoadedProcessor.LoadState.DOWNLOADING_DEPENDENCIES;
-import static 
org.apache.nifi.components.AsyncLoadedProcessor.LoadState.INITIALIZING_ENVIRONMENT;
-import static 
org.apache.nifi.components.AsyncLoadedProcessor.LoadState.LOADING_PROCESSOR_CODE;
-import static org.apache.nifi.components.validation.ValidationStatus.INVALID;
-import static 
org.apache.nifi.components.validation.ValidationStatus.VALIDATING;
-import static 
org.apache.nifi.minifi.commons.api.MiNiFiConstants.BACKUP_EXTENSION;
-import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.RAW_EXTENSION;
-import static org.apache.nifi.minifi.commons.util.FlowUpdateUtils.backup;
-import static org.apache.nifi.minifi.commons.util.FlowUpdateUtils.persist;
-import static 
org.apache.nifi.minifi.commons.util.FlowUpdateUtils.removeIfExists;
-import static org.apache.nifi.minifi.commons.util.FlowUpdateUtils.revert;
-
-import java.io.IOException;
-import java.nio.file.Path;
-import java.util.Collection;
-import java.util.List;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Predicate;
-import java.util.function.Supplier;
-import java.util.stream.Stream;
 import org.apache.commons.io.FilenameUtils;
 import org.apache.nifi.c2.client.service.operation.UpdateConfigurationStrategy;
 import org.apache.nifi.components.AsyncLoadedProcessor;
 import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.controller.ComponentNode;
 import org.apache.nifi.controller.FlowController;
 import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.flow.FlowManager;
 import org.apache.nifi.controller.flow.VersionedDataflow;
+import org.apache.nifi.flow.VersionedConnection;
+import org.apache.nifi.flow.VersionedProcessGroup;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.groups.RemoteProcessGroup;
 import org.apache.nifi.minifi.commons.service.FlowEnrichService;
@@ -61,6 +38,36 @@ import org.apache.nifi.services.FlowService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Collections.emptySet;
+import static java.util.UUID.randomUUID;
+import static java.util.function.Predicate.not;
+import static 
org.apache.nifi.components.AsyncLoadedProcessor.LoadState.DOWNLOADING_DEPENDENCIES;
+import static 
org.apache.nifi.components.AsyncLoadedProcessor.LoadState.INITIALIZING_ENVIRONMENT;
+import static 
org.apache.nifi.components.AsyncLoadedProcessor.LoadState.LOADING_PROCESSOR_CODE;
+import static org.apache.nifi.components.validation.ValidationStatus.INVALID;
+import static 
org.apache.nifi.components.validation.ValidationStatus.VALIDATING;
+import static 
org.apache.nifi.minifi.commons.api.MiNiFiConstants.BACKUP_EXTENSION;
+import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.RAW_EXTENSION;
+import static org.apache.nifi.minifi.commons.util.FlowUpdateUtils.backup;
+import static org.apache.nifi.minifi.commons.util.FlowUpdateUtils.persist;
+import static 
org.apache.nifi.minifi.commons.util.FlowUpdateUtils.removeIfExists;
+import static org.apache.nifi.minifi.commons.util.FlowUpdateUtils.revert;
+
 public class DefaultUpdateConfigurationStrategy implements 
UpdateConfigurationStrategy {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(DefaultUpdateConfigurationStrategy.class);
@@ -105,8 +112,12 @@ public class DefaultUpdateConfigurationStrategy implements 
UpdateConfigurationSt
         if (LOGGER.isDebugEnabled()) {
             LOGGER.debug("Attempting to update flow with content: \n{}", new 
String(rawFlow, UTF_8));
         }
-
+        Set<String> originalConnectionIds = emptySet();
         try {
+            originalConnectionIds = 
findAllExistingConnections(flowController.getFlowManager().getRootGroup())
+                    .stream()
+                    .map(Connection::getIdentifier)
+                    .collect(Collectors.toSet());
             VersionedDataflow rawDataFlow = 
flowSerDeService.deserialize(rawFlow);
 
             VersionedDataflow propertyEncryptedRawDataFlow = 
flowPropertyEncryptor.encryptSensitiveProperties(rawDataFlow);
@@ -120,7 +131,7 @@ public class DefaultUpdateConfigurationStrategy implements 
UpdateConfigurationSt
             persist(serializedPropertyEncryptedRawDataFlow, 
rawFlowConfigurationFile, false);
             persist(serializedEnrichedFlowCandidate, flowConfigurationFile, 
true);
 
-            reloadFlow();
+            
reloadFlow(findAllProposedConnectionIds(enrichedFlowCandidate.getRootGroup()));
 
             return true;
         } catch (IllegalStateException e) {
@@ -128,7 +139,7 @@ public class DefaultUpdateConfigurationStrategy implements 
UpdateConfigurationSt
             revert(backupFlowConfigurationFile, flowConfigurationFile);
             revert(backupRawFlowConfigurationFile, rawFlowConfigurationFile);
             try {
-                reloadFlow();
+                reloadFlow(originalConnectionIds);
             } catch (IOException ex) {
                 LOGGER.error("Unable to reload the reverted flow", e);
             }
@@ -144,9 +155,9 @@ public class DefaultUpdateConfigurationStrategy implements 
UpdateConfigurationSt
         }
     }
 
-    private void reloadFlow() throws IOException {
+    private void reloadFlow(Set<String> proposedConnectionIds) throws 
IOException {
         LOGGER.info("Initiating flow reload");
-        stopFlowGracefully(flowController.getFlowManager().getRootGroup());
+        stopFlowGracefully(flowController.getFlowManager().getRootGroup(), 
proposedConnectionIds);
 
         flowService.load(null);
         flowController.onFlowInitialized(true);
@@ -161,7 +172,7 @@ public class DefaultUpdateConfigurationStrategy implements 
UpdateConfigurationSt
         LOGGER.info("Flow has been reloaded successfully");
     }
 
-    private void stopFlowGracefully(ProcessGroup rootGroup) {
+    private void stopFlowGracefully(ProcessGroup rootGroup, Set<String> 
proposedConnectionIds) {
         LOGGER.info("Stopping flow gracefully");
         Optional<ProcessGroup> drainResult = 
stopSourceProcessorsAndWaitFlowToDrain(rootGroup);
 
@@ -169,12 +180,17 @@ public class DefaultUpdateConfigurationStrategy 
implements UpdateConfigurationSt
         rootGroup.getRemoteProcessGroups().stream()
             .map(RemoteProcessGroup::stopTransmitting)
             .forEach(this::waitForStopOrLogTimeOut);
-        drainResult.ifPresentOrElse(
-            rootProcessGroup -> {
-                LOGGER.warn("Flow did not stop within graceful period. Force 
stopping flow and emptying queues");
-                rootProcessGroup.dropAllFlowFiles(randomUUID().toString(), 
randomUUID().toString());
-            },
-            () -> LOGGER.info("Flow has been stopped gracefully"));
+        
drainResult.ifPresentOrElse(emptyQueuesForNonReferencedQueues(proposedConnectionIds),
 () -> LOGGER.info("Flow has been stopped gracefully"));
+    }
+
+    private Consumer<ProcessGroup> 
emptyQueuesForNonReferencedQueues(Set<String> proposedConnectionIds) {
+        return rootProcessGroup -> {
+            LOGGER.warn("Flow did not stop within graceful period. Force 
stopping flow and emptying non referenced queues");
+            findAllExistingConnections(rootProcessGroup).stream()
+                    .filter(connection -> 
!proposedConnectionIds.contains(connection.getIdentifier()))
+                    .map(Connection::getFlowFileQueue)
+                    .forEach(queue -> 
queue.dropFlowFiles(randomUUID().toString(), randomUUID().toString()));
+        };
     }
 
     private Optional<ProcessGroup> 
stopSourceProcessorsAndWaitFlowToDrain(ProcessGroup rootGroup) {
@@ -262,4 +278,22 @@ public class DefaultUpdateConfigurationStrategy implements 
UpdateConfigurationSt
             }
         }
     }
+
+    private Set<String> findAllProposedConnectionIds(VersionedProcessGroup 
versionedProcessGroup) {
+        return versionedProcessGroup == null
+                ? emptySet()
+                : Stream.concat(
+                    
versionedProcessGroup.getConnections().stream().map(VersionedConnection::getInstanceIdentifier),
+                    
versionedProcessGroup.getProcessGroups().stream().map(this::findAllProposedConnectionIds).flatMap(Set::stream)
+                ).collect(Collectors.toSet());
+    }
+
+    private Set<Connection> findAllExistingConnections(ProcessGroup 
processGroup) {
+        return processGroup == null
+                ? emptySet()
+                : Stream.concat(
+                    processGroup.getConnections().stream(),
+                    
processGroup.getProcessGroups().stream().map(this::findAllExistingConnections).flatMap(Set::stream)
+                ).collect(Collectors.toSet());
+    }
 }

Reply via email to