Repository: incubator-nifi
Updated Branches:
  refs/heads/NIFI-250 048c5d9aa -> fc76a6165


NIFI-250: Load controller services in correct order instead of arbitrary order


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/baa0e74c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/baa0e74c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/baa0e74c

Branch: refs/heads/NIFI-250
Commit: baa0e74cc37a6c74097da82ebf85d3b6e8e01d1a
Parents: 883c4ac
Author: Mark Payne <marka...@hotmail.com>
Authored: Wed Mar 11 08:45:33 2015 -0400
Committer: Mark Payne <marka...@hotmail.com>
Committed: Wed Mar 11 08:45:33 2015 -0400

----------------------------------------------------------------------
 .../cluster/manager/impl/WebClusterManager.java |   3 +-
 .../controller/StandardFlowSynchronizer.java    |  48 ++------
 .../service/ControllerServiceLoader.java        | 112 ++++++++++++++-----
 3 files changed, 95 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/baa0e74c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index f3dd0a0..4d731ab 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -184,7 +184,6 @@ import org.apache.nifi.util.ObjectHolder;
 import org.apache.nifi.util.ReflectionUtils;
 import org.apache.nifi.web.OptimisticLockingManager;
 import org.apache.nifi.web.Revision;
-import org.apache.nifi.web.UpdateRevision;
 import org.apache.nifi.web.api.dto.FlowSnippetDTO;
 import org.apache.nifi.web.api.dto.NodeDTO;
 import org.apache.nifi.web.api.dto.ProcessGroupDTO;
@@ -462,7 +461,7 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
 
                 final byte[] serializedServices = 
clusterDataFlow.getControllerServices();
                 if ( serializedServices != null && serializedServices.length > 
0 ) {
-                       ControllerServiceLoader.loadControllerServices(this, 
new ByteArrayInputStream(serializedServices));
+                       ControllerServiceLoader.loadControllerServices(this, 
new ByteArrayInputStream(serializedServices), encryptor, bulletinRepository, 
properties.getAutoResumeState());
                 }
                 
                 // start multicast broadcasting service, if configured

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/baa0e74c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
index f1d169f..01ad941 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
@@ -53,6 +53,7 @@ import org.apache.nifi.connectable.Size;
 import org.apache.nifi.controller.exception.ProcessorInstantiationException;
 import org.apache.nifi.controller.label.Label;
 import 
org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
+import org.apache.nifi.controller.service.ControllerServiceLoader;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.controller.service.ControllerServiceState;
 import org.apache.nifi.encrypt.StringEncryptor;
@@ -231,12 +232,13 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
                 final Element controllerServicesElement = (Element) 
DomUtils.getChild(rootElement, "controllerServices");
                    if ( controllerServicesElement != null ) {
                        final List<Element> serviceElements = 
DomUtils.getChildElementsByTagName(controllerServicesElement, 
"controllerService");
-                       for ( final Element serviceElement : serviceElements ) {
-                               if ( !initialized || existingFlowEmpty ) {
-                                       addControllerService(controller, 
serviceElement, encryptor);
-                               } else {
-                                       updateControllerService(controller, 
serviceElement, encryptor);
-                               }
+                       
+                       if ( !initialized || existingFlowEmpty ) {
+                           
ControllerServiceLoader.loadControllerServices(serviceElements, controller, 
encryptor, controller.getBulletinRepository(), autoResumeState);
+                       } else {
+                           for ( final Element serviceElement : 
serviceElements ) {
+                               updateControllerService(controller, 
serviceElement, encryptor);
+                           }
                        }
                 }
 
@@ -345,40 +347,6 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
         return baos.toByteArray();
     }
     
-    private void addControllerService(final FlowController controller, final 
Element controllerServiceElement, final StringEncryptor encryptor) {
-       final ControllerServiceDTO dto = 
FlowFromDOMFactory.getControllerService(controllerServiceElement, encryptor);
-       
-       final ControllerServiceNode node = 
controller.createControllerService(dto.getType(), dto.getId(), false);
-       node.setName(dto.getName());
-       node.setComments(dto.getComments());
-       node.setAnnotationData(dto.getAnnotationData());
-       
-        for (final Map.Entry<String, String> entry : 
dto.getProperties().entrySet()) {
-            if (entry.getValue() == null) {
-                node.removeProperty(entry.getKey());
-            } else {
-                node.setProperty(entry.getKey(), entry.getValue());
-            }
-        }
-
-        if ( autoResumeState ) {
-            final ControllerServiceState state = 
ControllerServiceState.valueOf(dto.getState());
-            final boolean enable = (state == ControllerServiceState.ENABLED || 
state == ControllerServiceState.ENABLING);
-               if (enable) {
-                       try {
-                               controller.enableControllerService(node);
-                       } catch (final Exception e) {
-                               logger.error("Failed to enable " + node + " due 
to " + e);
-                               if ( logger.isDebugEnabled() ) {
-                                       logger.error("", e);
-                               }
-                               
-                               
controller.getBulletinRepository().addBulletin(BulletinFactory.createBulletin(
-                                               "Controller Service", 
Severity.ERROR.name(), "Could not start " + node + " due to " + e));
-                       }
-               }
-        }
-    }
     
     private void updateControllerService(final FlowController controller, 
final Element controllerServiceElement, final StringEncryptor encryptor) {
        final ControllerServiceDTO dto = 
FlowFromDOMFactory.getControllerService(controllerServiceElement, encryptor);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/baa0e74c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
index 575c375..32d9c46 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
@@ -20,7 +20,10 @@ import java.io.BufferedInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import javax.xml.parsers.DocumentBuilder;
 import javax.xml.parsers.DocumentBuilderFactory;
@@ -28,7 +31,13 @@ import javax.xml.parsers.ParserConfigurationException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.nifi.controller.FlowFromDOMFactory;
+import org.apache.nifi.encrypt.StringEncryptor;
+import org.apache.nifi.events.BulletinFactory;
+import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.util.DomUtils;
+import org.apache.nifi.web.api.dto.ControllerServiceDTO;
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
 import org.xml.sax.SAXException;
@@ -42,12 +51,10 @@ public class ControllerServiceLoader {
     private static final Log logger = 
LogFactory.getLog(ControllerServiceLoader.class);
 
 
-    public static List<ControllerServiceNode> loadControllerServices(final 
ControllerServiceProvider provider, final InputStream serializedStream) throws 
IOException {
+    public static List<ControllerServiceNode> loadControllerServices(final 
ControllerServiceProvider provider, final InputStream serializedStream, final 
StringEncryptor encryptor, final BulletinRepository bulletinRepo, final boolean 
autoResumeState) throws IOException {
         final DocumentBuilderFactory documentBuilderFactory = 
DocumentBuilderFactory.newInstance();
         documentBuilderFactory.setNamespaceAware(true);
 
-        final List<ControllerServiceNode> services = new ArrayList<>();
-
         try (final InputStream in = new BufferedInputStream(serializedStream)) 
{
             final DocumentBuilder builder = 
documentBuilderFactory.newDocumentBuilder();
 
@@ -80,34 +87,87 @@ public class ControllerServiceLoader {
                     throw err;
                 }
             });
-
-            //if controllerService.xml does not exist, create an empty file...
+            
             final Document document = builder.parse(in);
             final Element controllerServices = 
DomUtils.getChild(document.getDocumentElement(), "controllerServices");
-            final List<Element> serviceNodes = 
DomUtils.getChildElementsByTagName(controllerServices, "controllerService");
-            for (final Element serviceElement : serviceNodes) {
-                //get properties for the specific controller task - id, name, 
class,
-                //and schedulingPeriod must be set
-                final String serviceId = DomUtils.getChild(serviceElement, 
"id").getTextContent().trim();
-                final String serviceClass = DomUtils.getChild(serviceElement, 
"class").getTextContent().trim();
+            final List<Element> serviceElements = 
DomUtils.getChildElementsByTagName(controllerServices, "controllerService");
+            return new 
ArrayList<ControllerServiceNode>(loadControllerServices(serviceElements, 
provider, encryptor, bulletinRepo, autoResumeState));
+        } catch (SAXException | ParserConfigurationException sxe) {
+            throw new IOException(sxe);
+        }
+    }
+    
+    public static Collection<ControllerServiceNode> 
loadControllerServices(final List<Element> serviceElements, final 
ControllerServiceProvider provider, final StringEncryptor encryptor, final 
BulletinRepository bulletinRepo, final boolean autoResumeState) {
+        final Map<Element, ControllerServiceNode> nodeMap = new HashMap<>();
+        for ( final Element serviceElement : serviceElements ) {
+            final ControllerServiceNode serviceNode = 
createControllerService(provider, serviceElement, encryptor);
+            nodeMap.put(serviceElement, serviceNode);
+        }
+        for ( final Map.Entry<Element, ControllerServiceNode> entry : 
nodeMap.entrySet() ) {
+            configureControllerService(entry.getValue(), entry.getKey(), 
encryptor);
+        }
+        
+        // Start services
+        if ( autoResumeState ) {
+            for ( final Map.Entry<Element, ControllerServiceNode> entry : 
nodeMap.entrySet() ) {
+                final Element controllerServiceElement = entry.getKey();
+                final ControllerServiceNode serviceNode = entry.getValue();
                 
-                //set the class to be used for the configured controller task
-                final ControllerServiceNode serviceNode = 
provider.createControllerService(serviceClass, serviceId, false);
-
-                //optional task-specific properties
-                for (final Element optionalProperty : 
DomUtils.getChildElementsByTagName(serviceElement, "property")) {
-                    final String name = 
optionalProperty.getAttribute("name").trim();
-                    final String value = 
optionalProperty.getTextContent().trim();
-                    serviceNode.setProperty(name, value);
+                final ControllerServiceDTO dto = 
FlowFromDOMFactory.getControllerService(controllerServiceElement, encryptor);
+                final ControllerServiceState state = 
ControllerServiceState.valueOf(dto.getState());
+                final boolean enable = (state == 
ControllerServiceState.ENABLED || state == ControllerServiceState.ENABLING);
+                if (enable) {
+                    try {
+                        provider.enableReferencingServices(serviceNode);
+                    } catch (final Exception e) {
+                        logger.error("Failed to enable " + serviceNode + " due 
to " + e);
+                        if ( logger.isDebugEnabled() ) {
+                            logger.error("", e);
+                        }
+                        
+                        
bulletinRepo.addBulletin(BulletinFactory.createBulletin(
+                                "Controller Service", Severity.ERROR.name(), 
"Could not start services referencing " + serviceNode + " due to " + e));
+                        continue;
+                    }
+                    
+                    try {
+                        provider.enableControllerService(serviceNode);
+                    } catch (final Exception e) {
+                        logger.error("Failed to enable " + serviceNode + " due 
to " + e);
+                        if ( logger.isDebugEnabled() ) {
+                            logger.error("", e);
+                        }
+                        
+                        
bulletinRepo.addBulletin(BulletinFactory.createBulletin(
+                                "Controller Service", Severity.ERROR.name(), 
"Could not start " + serviceNode + " due to " + e));
+                    }
                 }
-
-                services.add(serviceNode);
-                provider.enableControllerService(serviceNode);
             }
-        } catch (SAXException | ParserConfigurationException sxe) {
-            throw new IOException(sxe);
         }
-
-        return services;
+        
+        return nodeMap.values();
+    }
+    
+    
+    private static ControllerServiceNode createControllerService(final 
ControllerServiceProvider provider, final Element controllerServiceElement, 
final StringEncryptor encryptor) {
+        final ControllerServiceDTO dto = 
FlowFromDOMFactory.getControllerService(controllerServiceElement, encryptor);
+        
+        final ControllerServiceNode node = 
provider.createControllerService(dto.getType(), dto.getId(), false);
+        node.setName(dto.getName());
+        node.setComments(dto.getComments());
+        return node;
+    }
+    
+    private static void configureControllerService(final ControllerServiceNode 
node, final Element controllerServiceElement, final StringEncryptor encryptor) {
+        final ControllerServiceDTO dto = 
FlowFromDOMFactory.getControllerService(controllerServiceElement, encryptor);
+        node.setAnnotationData(dto.getAnnotationData());
+        
+        for (final Map.Entry<String, String> entry : 
dto.getProperties().entrySet()) {
+            if (entry.getValue() == null) {
+                node.removeProperty(entry.getKey());
+            } else {
+                node.setProperty(entry.getKey(), entry.getValue());
+            }
+        }
     }
 }

Reply via email to