This is an automated email from the ASF dual-hosted git repository. joewitt pushed a commit to branch support/nifi-1.16 in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 0b7d11776b786a6e73468d48252f8dc47cef6662 Author: markap14 <marka...@hotmail.com> AuthorDate: Fri May 13 15:09:23 2022 -0400 NIFI-10001: When enabling a collection of Controller Services, change… (#6042) * NIFI-10001: When enabling a collection of Controller Services, changed logic. Instead of enabling dependent services and waiting for them to complete enablement before starting a given service, just start the services given. The previous logic was necessary long ago because we couldn't enable a service unless all dependent services were fully enabled. But that changed a while ago. Now, we can enable a service when it's invalid. It'll just keep trying to enable until it becomes valid [...] * NIFI-10001: Restored previous implementation for StandardControllerServiceProvider, as the changes were not ultimately what we needed. Changed StandardProcessGroup to use a ConcurrentHashMap for controller services instead of a HashMap with readLock. This was causing a deadlock when we enable a Controller Service that references another service during flow synchronization. Flow Synchronization was happening within a write lock and enabling the service required a read lock on the gro [...] --- .../service/StandardControllerServiceProvider.java | 8 ++--- .../apache/nifi/groups/StandardProcessGroup.java | 19 +++--------- .../groups/StandardProcessGroupSynchronizer.java | 21 +++++++++---- .../apache/nifi/web/StandardNiFiServiceFacade.java | 2 ++ .../system/clustering/FlowSynchronizationIT.java | 34 ++++++++++++++++++++++ 5 files changed, 59 insertions(+), 25 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java index a8e032a73a..592d096c8b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java @@ -449,11 +449,9 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi return (rootServiceNode == null) ? null : rootServiceNode.getProxiedControllerService(); } - final Set<ControllerServiceNode> servicesForGroup = groupOfInterest.getControllerServices(true); - for (final ControllerServiceNode serviceNode : servicesForGroup) { - if (serviceIdentifier.equals(serviceNode.getIdentifier())) { - return serviceNode.getProxiedControllerService(); - } + final ControllerServiceNode serviceNode = groupOfInterest.findControllerService(serviceIdentifier, false, true); + if (serviceNode != null) { + return serviceNode.getProxiedControllerService(); } return null; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index 1924f9bcfb..33fb77c620 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -133,6 +133,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -184,7 +185,7 @@ public final class StandardProcessGroup implements ProcessGroup { private final Map<String, RemoteProcessGroup> remoteGroups = new HashMap<>(); private final Map<String, ProcessorNode> processors = new HashMap<>(); private final Map<String, Funnel> funnels = new HashMap<>(); - private final Map<String, ControllerServiceNode> controllerServices = new HashMap<>(); + private final Map<String, ControllerServiceNode> controllerServices = new ConcurrentHashMap<>(); private final Map<String, Template> templates = new HashMap<>(); private final PropertyEncryptor encryptor; private final MutableVariableRegistry variableRegistry; @@ -2311,24 +2312,12 @@ public final class StandardProcessGroup implements ProcessGroup { @Override public ControllerServiceNode getControllerService(final String id) { - readLock.lock(); - try { - return controllerServices.get(requireNonNull(id)); - } finally { - readLock.unlock(); - } + return controllerServices.get(requireNonNull(id)); } @Override public Set<ControllerServiceNode> getControllerServices(final boolean recursive) { - final Set<ControllerServiceNode> services = new HashSet<>(); - - readLock.lock(); - try { - services.addAll(controllerServices.values()); - } finally { - readLock.unlock(); - } + final Set<ControllerServiceNode> services = new HashSet<>(controllerServices.values()); if (recursive) { final ProcessGroup parentGroup = parent.get(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java index 8c5592dc18..fc3eb383d0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java @@ -165,6 +165,12 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize if (FlowDifferenceFilters.isScheduledStateNew(diff)) { continue; } + // If the difference type is a Scheduled State Change, we want to ignore it, because we are just trying to + // find components that need to be stopped in order to be updated. We don't need to stop a component in order + // to change its Scheduled State. + if (diff.getDifferenceType() == DifferenceType.SCHEDULED_STATE_CHANGED) { + continue; + } // If this update adds a new Controller Service, then we need to check if the service already exists at a higher level // and if so compare our VersionedControllerService to the existing service. @@ -196,12 +202,17 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize } if (LOG.isInfoEnabled()) { - final String differencesByLine = flowComparison.getDifferences().stream() - .map(FlowDifference::toString) - .collect(Collectors.joining("\n")); + final Set<FlowDifference> differences = flowComparison.getDifferences(); + if (differences.isEmpty()) { + LOG.info("No differences between current flow and proposed flow for {}", group); + } else { + final String differencesByLine = differences.stream() + .map(FlowDifference::toString) + .collect(Collectors.joining("\n")); - LOG.info("Updating {} to {}; there are {} differences to take into account:\n{}", group, versionedExternalFlow, - flowComparison.getDifferences().size(), differencesByLine); + LOG.info("Updating {} to {}; there are {} differences to take into account:\n{}", group, versionedExternalFlow, + differences.size(), differencesByLine); + } } final Set<String> knownVariables = getKnownVariableNames(group); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index 4cafa8429f..d3409fdde0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -2890,6 +2890,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public RevisionUpdate<ControllerServiceReferencingComponentsEntity> update() { final Set<ComponentNode> updated = controllerServiceDAO.updateControllerServiceReferencingComponents(controllerServiceId, scheduledState, controllerServiceState); + controllerFacade.save(); + final ControllerServiceReference updatedReference = controllerServiceDAO.getControllerService(controllerServiceId).getReferences(); // get the revisions of the updated components diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java index 7a5d89e1a2..faf1409b9f 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java @@ -873,6 +873,40 @@ public class FlowSynchronizationIT extends NiFiSystemIT { }); } + @Test + public void testRejoinAfterControllerServiceEnabled() throws NiFiClientException, IOException, InterruptedException { + final ControllerServiceEntity controllerService = getClientUtil().createControllerService("StandardCountService"); + disconnectNode(2); + + getClientUtil().enableControllerService(controllerService); + reconnectNode(2); + waitForAllNodesConnected(); + + switchClientToNode(2); + waitFor(() -> { + final ControllerServiceEntity currentService = getNifiClient().getControllerServicesClient(DO_NOT_REPLICATE).getControllerService(controllerService.getId()); + return ControllerServiceState.ENABLED.name().equals(currentService.getComponent().getState()); + }); + } + + @Test + public void testRejoinAfterControllerServiceDisabled() throws NiFiClientException, IOException, InterruptedException { + final ControllerServiceEntity controllerService = getClientUtil().createControllerService("StandardCountService"); + getClientUtil().enableControllerService(controllerService); + + disconnectNode(2); + getClientUtil().disableControllerService(controllerService); + + reconnectNode(2); + waitForAllNodesConnected(); + + switchClientToNode(2); + waitFor(() -> { + final ControllerServiceEntity currentService = getNifiClient().getControllerServicesClient(DO_NOT_REPLICATE).getControllerService(controllerService.getId()); + return ControllerServiceState.DISABLED.name().equals(currentService.getComponent().getState()); + }); + } + private VersionedDataflow getNode2Flow() throws IOException { final File instanceDir = getNiFiInstance().getNodeInstance(2).getInstanceDirectory();