Repository: incubator-nifi Updated Branches: refs/heads/NIFI-250 048c5d9aa -> fc76a6165
NIFI-250: Load controller services in correct order instead of arbitrary order Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/baa0e74c Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/baa0e74c Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/baa0e74c Branch: refs/heads/NIFI-250 Commit: baa0e74cc37a6c74097da82ebf85d3b6e8e01d1a Parents: 883c4ac Author: Mark Payne <marka...@hotmail.com> Authored: Wed Mar 11 08:45:33 2015 -0400 Committer: Mark Payne <marka...@hotmail.com> Committed: Wed Mar 11 08:45:33 2015 -0400 ---------------------------------------------------------------------- .../cluster/manager/impl/WebClusterManager.java | 3 +- .../controller/StandardFlowSynchronizer.java | 48 ++------ .../service/ControllerServiceLoader.java | 112 ++++++++++++++----- 3 files changed, 95 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/baa0e74c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java index f3dd0a0..4d731ab 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java @@ -184,7 +184,6 @@ import org.apache.nifi.util.ObjectHolder; import org.apache.nifi.util.ReflectionUtils; import org.apache.nifi.web.OptimisticLockingManager; import org.apache.nifi.web.Revision; -import org.apache.nifi.web.UpdateRevision; import org.apache.nifi.web.api.dto.FlowSnippetDTO; import org.apache.nifi.web.api.dto.NodeDTO; import org.apache.nifi.web.api.dto.ProcessGroupDTO; @@ -462,7 +461,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C final byte[] serializedServices = clusterDataFlow.getControllerServices(); if ( serializedServices != null && serializedServices.length > 0 ) { - ControllerServiceLoader.loadControllerServices(this, new ByteArrayInputStream(serializedServices)); + ControllerServiceLoader.loadControllerServices(this, new ByteArrayInputStream(serializedServices), encryptor, bulletinRepository, properties.getAutoResumeState()); } // start multicast broadcasting service, if configured http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/baa0e74c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java index f1d169f..01ad941 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java @@ -53,6 +53,7 @@ import org.apache.nifi.connectable.Size; import org.apache.nifi.controller.exception.ProcessorInstantiationException; import org.apache.nifi.controller.label.Label; import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException; +import org.apache.nifi.controller.service.ControllerServiceLoader; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceState; import org.apache.nifi.encrypt.StringEncryptor; @@ -231,12 +232,13 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { final Element controllerServicesElement = (Element) DomUtils.getChild(rootElement, "controllerServices"); if ( controllerServicesElement != null ) { final List<Element> serviceElements = DomUtils.getChildElementsByTagName(controllerServicesElement, "controllerService"); - for ( final Element serviceElement : serviceElements ) { - if ( !initialized || existingFlowEmpty ) { - addControllerService(controller, serviceElement, encryptor); - } else { - updateControllerService(controller, serviceElement, encryptor); - } + + if ( !initialized || existingFlowEmpty ) { + ControllerServiceLoader.loadControllerServices(serviceElements, controller, encryptor, controller.getBulletinRepository(), autoResumeState); + } else { + for ( final Element serviceElement : serviceElements ) { + updateControllerService(controller, serviceElement, encryptor); + } } } @@ -345,40 +347,6 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { return baos.toByteArray(); } - private void addControllerService(final FlowController controller, final Element controllerServiceElement, final StringEncryptor encryptor) { - final ControllerServiceDTO dto = FlowFromDOMFactory.getControllerService(controllerServiceElement, encryptor); - - final ControllerServiceNode node = controller.createControllerService(dto.getType(), dto.getId(), false); - node.setName(dto.getName()); - node.setComments(dto.getComments()); - node.setAnnotationData(dto.getAnnotationData()); - - for (final Map.Entry<String, String> entry : dto.getProperties().entrySet()) { - if (entry.getValue() == null) { - node.removeProperty(entry.getKey()); - } else { - node.setProperty(entry.getKey(), entry.getValue()); - } - } - - if ( autoResumeState ) { - final ControllerServiceState state = ControllerServiceState.valueOf(dto.getState()); - final boolean enable = (state == ControllerServiceState.ENABLED || state == ControllerServiceState.ENABLING); - if (enable) { - try { - controller.enableControllerService(node); - } catch (final Exception e) { - logger.error("Failed to enable " + node + " due to " + e); - if ( logger.isDebugEnabled() ) { - logger.error("", e); - } - - controller.getBulletinRepository().addBulletin(BulletinFactory.createBulletin( - "Controller Service", Severity.ERROR.name(), "Could not start " + node + " due to " + e)); - } - } - } - } private void updateControllerService(final FlowController controller, final Element controllerServiceElement, final StringEncryptor encryptor) { final ControllerServiceDTO dto = FlowFromDOMFactory.getControllerService(controllerServiceElement, encryptor); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/baa0e74c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java index 575c375..32d9c46 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java @@ -20,7 +20,10 @@ import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilderFactory; @@ -28,7 +31,13 @@ import javax.xml.parsers.ParserConfigurationException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.nifi.controller.FlowFromDOMFactory; +import org.apache.nifi.encrypt.StringEncryptor; +import org.apache.nifi.events.BulletinFactory; +import org.apache.nifi.reporting.BulletinRepository; +import org.apache.nifi.reporting.Severity; import org.apache.nifi.util.DomUtils; +import org.apache.nifi.web.api.dto.ControllerServiceDTO; import org.w3c.dom.Document; import org.w3c.dom.Element; import org.xml.sax.SAXException; @@ -42,12 +51,10 @@ public class ControllerServiceLoader { private static final Log logger = LogFactory.getLog(ControllerServiceLoader.class); - public static List<ControllerServiceNode> loadControllerServices(final ControllerServiceProvider provider, final InputStream serializedStream) throws IOException { + public static List<ControllerServiceNode> loadControllerServices(final ControllerServiceProvider provider, final InputStream serializedStream, final StringEncryptor encryptor, final BulletinRepository bulletinRepo, final boolean autoResumeState) throws IOException { final DocumentBuilderFactory documentBuilderFactory = DocumentBuilderFactory.newInstance(); documentBuilderFactory.setNamespaceAware(true); - final List<ControllerServiceNode> services = new ArrayList<>(); - try (final InputStream in = new BufferedInputStream(serializedStream)) { final DocumentBuilder builder = documentBuilderFactory.newDocumentBuilder(); @@ -80,34 +87,87 @@ public class ControllerServiceLoader { throw err; } }); - - //if controllerService.xml does not exist, create an empty file... + final Document document = builder.parse(in); final Element controllerServices = DomUtils.getChild(document.getDocumentElement(), "controllerServices"); - final List<Element> serviceNodes = DomUtils.getChildElementsByTagName(controllerServices, "controllerService"); - for (final Element serviceElement : serviceNodes) { - //get properties for the specific controller task - id, name, class, - //and schedulingPeriod must be set - final String serviceId = DomUtils.getChild(serviceElement, "id").getTextContent().trim(); - final String serviceClass = DomUtils.getChild(serviceElement, "class").getTextContent().trim(); + final List<Element> serviceElements = DomUtils.getChildElementsByTagName(controllerServices, "controllerService"); + return new ArrayList<ControllerServiceNode>(loadControllerServices(serviceElements, provider, encryptor, bulletinRepo, autoResumeState)); + } catch (SAXException | ParserConfigurationException sxe) { + throw new IOException(sxe); + } + } + + public static Collection<ControllerServiceNode> loadControllerServices(final List<Element> serviceElements, final ControllerServiceProvider provider, final StringEncryptor encryptor, final BulletinRepository bulletinRepo, final boolean autoResumeState) { + final Map<Element, ControllerServiceNode> nodeMap = new HashMap<>(); + for ( final Element serviceElement : serviceElements ) { + final ControllerServiceNode serviceNode = createControllerService(provider, serviceElement, encryptor); + nodeMap.put(serviceElement, serviceNode); + } + for ( final Map.Entry<Element, ControllerServiceNode> entry : nodeMap.entrySet() ) { + configureControllerService(entry.getValue(), entry.getKey(), encryptor); + } + + // Start services + if ( autoResumeState ) { + for ( final Map.Entry<Element, ControllerServiceNode> entry : nodeMap.entrySet() ) { + final Element controllerServiceElement = entry.getKey(); + final ControllerServiceNode serviceNode = entry.getValue(); - //set the class to be used for the configured controller task - final ControllerServiceNode serviceNode = provider.createControllerService(serviceClass, serviceId, false); - - //optional task-specific properties - for (final Element optionalProperty : DomUtils.getChildElementsByTagName(serviceElement, "property")) { - final String name = optionalProperty.getAttribute("name").trim(); - final String value = optionalProperty.getTextContent().trim(); - serviceNode.setProperty(name, value); + final ControllerServiceDTO dto = FlowFromDOMFactory.getControllerService(controllerServiceElement, encryptor); + final ControllerServiceState state = ControllerServiceState.valueOf(dto.getState()); + final boolean enable = (state == ControllerServiceState.ENABLED || state == ControllerServiceState.ENABLING); + if (enable) { + try { + provider.enableReferencingServices(serviceNode); + } catch (final Exception e) { + logger.error("Failed to enable " + serviceNode + " due to " + e); + if ( logger.isDebugEnabled() ) { + logger.error("", e); + } + + bulletinRepo.addBulletin(BulletinFactory.createBulletin( + "Controller Service", Severity.ERROR.name(), "Could not start services referencing " + serviceNode + " due to " + e)); + continue; + } + + try { + provider.enableControllerService(serviceNode); + } catch (final Exception e) { + logger.error("Failed to enable " + serviceNode + " due to " + e); + if ( logger.isDebugEnabled() ) { + logger.error("", e); + } + + bulletinRepo.addBulletin(BulletinFactory.createBulletin( + "Controller Service", Severity.ERROR.name(), "Could not start " + serviceNode + " due to " + e)); + } } - - services.add(serviceNode); - provider.enableControllerService(serviceNode); } - } catch (SAXException | ParserConfigurationException sxe) { - throw new IOException(sxe); } - - return services; + + return nodeMap.values(); + } + + + private static ControllerServiceNode createControllerService(final ControllerServiceProvider provider, final Element controllerServiceElement, final StringEncryptor encryptor) { + final ControllerServiceDTO dto = FlowFromDOMFactory.getControllerService(controllerServiceElement, encryptor); + + final ControllerServiceNode node = provider.createControllerService(dto.getType(), dto.getId(), false); + node.setName(dto.getName()); + node.setComments(dto.getComments()); + return node; + } + + private static void configureControllerService(final ControllerServiceNode node, final Element controllerServiceElement, final StringEncryptor encryptor) { + final ControllerServiceDTO dto = FlowFromDOMFactory.getControllerService(controllerServiceElement, encryptor); + node.setAnnotationData(dto.getAnnotationData()); + + for (final Map.Entry<String, String> entry : dto.getProperties().entrySet()) { + if (entry.getValue() == null) { + node.removeProperty(entry.getKey()); + } else { + node.setProperty(entry.getKey(), entry.getValue()); + } + } } }