Repository: incubator-nifi Updated Branches: refs/heads/master [created] 4d998c12c
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java new file mode 100644 index 0000000..17a1702 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java @@ -0,0 +1,1026 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.zip.GZIPInputStream; + +import javax.xml.XMLConstants; +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; +import javax.xml.validation.Schema; +import javax.xml.validation.SchemaFactory; + +import org.apache.nifi.cluster.protocol.DataFlow; +import org.apache.nifi.cluster.protocol.StandardDataFlow; +import org.apache.nifi.connectable.Connectable; +import org.apache.nifi.connectable.ConnectableType; +import org.apache.nifi.connectable.Connection; +import org.apache.nifi.connectable.Funnel; +import org.apache.nifi.connectable.Port; +import org.apache.nifi.connectable.Position; +import org.apache.nifi.connectable.Size; +import org.apache.nifi.controller.exception.ProcessorInstantiationException; +import org.apache.nifi.controller.label.Label; +import org.apache.nifi.events.BulletinFactory; +import org.apache.nifi.file.FileUtils; +import org.apache.nifi.fingerprint.FingerprintException; +import org.apache.nifi.fingerprint.FingerprintFactory; +import org.apache.nifi.flowfile.FlowFilePrioritizer; +import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.groups.RemoteProcessGroup; +import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor; +import org.apache.nifi.logging.LogLevel; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.remote.RemoteGroupPort; +import org.apache.nifi.remote.RootGroupPort; +import org.apache.nifi.reporting.Severity; +import org.apache.nifi.scheduling.SchedulingStrategy; +import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.web.api.dto.ConnectableDTO; +import org.apache.nifi.web.api.dto.ConnectionDTO; +import org.apache.nifi.web.api.dto.FlowSnippetDTO; +import org.apache.nifi.web.api.dto.FunnelDTO; +import org.apache.nifi.web.api.dto.LabelDTO; +import org.apache.nifi.web.api.dto.PortDTO; +import org.apache.nifi.web.api.dto.PositionDTO; +import org.apache.nifi.web.api.dto.ProcessGroupDTO; +import org.apache.nifi.web.api.dto.ProcessorConfigDTO; +import org.apache.nifi.web.api.dto.ProcessorDTO; +import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; + +import org.apache.commons.collections4.CollectionUtils; +import org.apache.nifi.encrypt.StringEncryptor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.w3c.dom.Document; +import org.w3c.dom.Element; +import org.w3c.dom.Node; +import org.w3c.dom.NodeList; +import org.xml.sax.SAXException; + +/** + * @author unattributed + */ +public class StandardFlowSynchronizer implements FlowSynchronizer { + + private static final Logger logger = LoggerFactory.getLogger(StandardFlowSynchronizer.class); + public static final URL FLOW_XSD_RESOURCE = StandardFlowSynchronizer.class.getResource("/FlowConfiguration.xsd"); + private final StringEncryptor encryptor; + + public StandardFlowSynchronizer(final StringEncryptor encryptor) { + this.encryptor = encryptor; + } + + public static boolean isEmpty(final DataFlow dataFlow, final StringEncryptor encryptor) { + if (dataFlow == null || dataFlow.getFlow() == null || dataFlow.getFlow().length == 0) { + return true; + } + + final Document document = parseFlowBytes(dataFlow.getFlow()); + final Element rootElement = document.getDocumentElement(); + + final Element rootGroupElement = (Element) rootElement.getElementsByTagName("rootGroup").item(0); + final ProcessGroupDTO rootGroupDto = FlowFromDOMFactory.getProcessGroup(null, rootGroupElement, encryptor); + return isEmpty(rootGroupDto); + } + + @Override + public void sync(final FlowController controller, final DataFlow proposedFlow, final StringEncryptor encryptor) throws FlowSerializationException, UninheritableFlowException, FlowSynchronizationException { + // get the controller's root group + final ProcessGroup rootGroup = controller.getGroup(controller.getRootGroupId()); + + // handle corner cases involving no proposed flow + if (proposedFlow == null) { + if (rootGroup.isEmpty()) { + return; // no sync to perform + } else { + throw new UninheritableFlowException("Proposed configuration is empty, but the controller contains a data flow."); + } + } + + // determine if the controller has been initialized + final boolean initialized = controller.isInitialized(); + logger.debug("Synching FlowController with proposed flow: Controller is Initialized = {}", initialized); + + // serialize controller state to bytes + final byte[] existingFlow; + final boolean existingFlowEmpty; + try { + if (initialized) { + existingFlow = toBytes(controller); + existingFlowEmpty = controller.getGroup(controller.getRootGroupId()).isEmpty(); + } else { + existingFlow = readFlowFromDisk(); + if (existingFlow == null || existingFlow.length == 0) { + existingFlowEmpty = true; + } else { + final Document document = parseFlowBytes(existingFlow); + final Element rootElement = document.getDocumentElement(); + + logger.trace("Setting controller thread counts"); + final Integer maxThreadCount = getInteger(rootElement, "maxThreadCount"); + if (maxThreadCount == null) { + controller.setMaxTimerDrivenThreadCount(getInt(rootElement, "maxTimerDrivenThreadCount")); + controller.setMaxEventDrivenThreadCount(getInt(rootElement, "maxEventDrivenThreadCount")); + } else { + controller.setMaxTimerDrivenThreadCount(maxThreadCount * 2 / 3); + controller.setMaxEventDrivenThreadCount(maxThreadCount / 3); + } + + logger.trace("Parsing process group from DOM"); + final Element rootGroupElement = (Element) rootElement.getElementsByTagName("rootGroup").item(0); + final ProcessGroupDTO rootGroupDto = FlowFromDOMFactory.getProcessGroup(null, rootGroupElement, encryptor); + existingFlowEmpty = isEmpty(rootGroupDto); + logger.debug("Existing Flow Empty = {}", existingFlowEmpty); + } + } + } catch (final IOException e) { + throw new FlowSerializationException(e); + } + + logger.trace("Exporting templates from controller"); + final byte[] existingTemplates = controller.getTemplateManager().export(); + logger.trace("Exporting snippets from controller"); + final byte[] existingSnippets = controller.getSnippetManager().export(); + + final DataFlow existingDataFlow = new StandardDataFlow(existingFlow, existingTemplates, existingSnippets); + + final boolean existingTemplatesEmpty = existingTemplates == null || existingTemplates.length == 0; + + // check that the proposed flow is inheritable by the controller + try { + if (!existingFlowEmpty) { + logger.trace("Checking flow inheritability"); + final String problemInheriting = checkFlowInheritability(existingDataFlow, proposedFlow, controller); + if (problemInheriting != null) { + throw new UninheritableFlowException("Proposed configuration is not inheritable by the flow controller because of flow differences: " + problemInheriting); + } + } + if (!existingTemplatesEmpty) { + logger.trace("Checking template inheritability"); + final String problemInheriting = checkTemplateInheritability(existingDataFlow, proposedFlow); + if (problemInheriting != null) { + throw new UninheritableFlowException("Proposed configuration is not inheritable by the flow controller because of flow differences: " + problemInheriting); + } + } + } catch (final FingerprintException fe) { + throw new FlowSerializationException("Failed to generate flow fingerprints", fe); + } + + // create document by parsing proposed flow bytes + logger.trace("Parsing proposed flow bytes as DOM document"); + final Document configuration = parseFlowBytes(proposedFlow.getFlow()); + + // attempt to sync controller with proposed flow + try { + if (configuration != null) { + // get the root element + final Element rootElement = (Element) configuration.getElementsByTagName("flowController").item(0); + + // set controller config + logger.trace("Updating flow config"); + final Integer maxThreadCount = getInteger(rootElement, "maxThreadCount"); + if (maxThreadCount == null) { + controller.setMaxTimerDrivenThreadCount(getInt(rootElement, "maxTimerDrivenThreadCount")); + controller.setMaxEventDrivenThreadCount(getInt(rootElement, "maxEventDrivenThreadCount")); + } else { + controller.setMaxTimerDrivenThreadCount(maxThreadCount * 2 / 3); + controller.setMaxEventDrivenThreadCount(maxThreadCount / 3); + } + + // get the root group XML element + final Element rootGroupElement = (Element) rootElement.getElementsByTagName("rootGroup").item(0); + + // if this controller isn't initialized or its emtpy, add the root group, otherwise update + if (!initialized || existingFlowEmpty) { + logger.trace("Adding root process group"); + addProcessGroup(controller, /* parent group */ null, rootGroupElement, encryptor); + } else { + logger.trace("Updating root process group"); + updateProcessGroup(controller, /* parent group */ null, rootGroupElement, encryptor); + } + } + + logger.trace("Synching templates"); + if ((existingTemplates == null || existingTemplates.length == 0) && proposedFlow.getTemplates() != null && proposedFlow.getTemplates().length > 0) { + // need to load templates + final TemplateManager templateManager = controller.getTemplateManager(); + final List<Template> proposedTemplateList = TemplateManager.parseBytes(proposedFlow.getTemplates()); + for (final Template template : proposedTemplateList) { + templateManager.addTemplate(template.getDetails()); + } + } + + // clear the snippets that are currently in memory + logger.trace("Clearing existing snippets"); + final SnippetManager snippetManager = controller.getSnippetManager(); + snippetManager.clear(); + + // if proposed flow has any snippets, load them + logger.trace("Loading proposed snippets"); + final byte[] proposedSnippets = proposedFlow.getSnippets(); + if (proposedSnippets != null && proposedSnippets.length > 0) { + for (final StandardSnippet snippet : SnippetManager.parseBytes(proposedSnippets)) { + snippetManager.addSnippet(snippet); + } + } + + logger.debug("Finished synching flows"); + } catch (final Exception ex) { + throw new FlowSynchronizationException(ex); + } + } + + private static boolean isEmpty(final ProcessGroupDTO dto) { + if (dto == null) { + return true; + } + + final FlowSnippetDTO contents = dto.getContents(); + if (contents == null) { + return true; + } + + return CollectionUtils.isEmpty(contents.getProcessors()) + && CollectionUtils.isEmpty(contents.getConnections()) + && CollectionUtils.isEmpty(contents.getFunnels()) + && CollectionUtils.isEmpty(contents.getLabels()) + && CollectionUtils.isEmpty(contents.getOutputPorts()) + && CollectionUtils.isEmpty(contents.getProcessGroups()) + && CollectionUtils.isEmpty(contents.getProcessors()) + && CollectionUtils.isEmpty(contents.getRemoteProcessGroups()); + } + + private static Document parseFlowBytes(final byte[] flow) throws FlowSerializationException { + // create document by parsing proposed flow bytes + try { + // create validating document builder + final SchemaFactory schemaFactory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI); + final Schema schema = schemaFactory.newSchema(FLOW_XSD_RESOURCE); + final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance(); + docFactory.setSchema(schema); + final DocumentBuilder docBuilder = docFactory.newDocumentBuilder(); + + // parse flow + return (flow == null || flow.length == 0) ? null : docBuilder.parse(new ByteArrayInputStream(flow)); + } catch (final SAXException | ParserConfigurationException | IOException ex) { + throw new FlowSerializationException(ex); + } + } + + private byte[] readFlowFromDisk() throws IOException { + final Path flowPath = NiFiProperties.getInstance().getFlowConfigurationFile().toPath(); + if (!Files.exists(flowPath) || Files.size(flowPath) == 0) { + return new byte[0]; + } + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + try (final InputStream in = Files.newInputStream(flowPath, StandardOpenOption.READ); + final InputStream gzipIn = new GZIPInputStream(in)) { + FileUtils.copy(gzipIn, baos); + } + + return baos.toByteArray(); + } + + private ProcessGroup updateProcessGroup(final FlowController controller, final ProcessGroup parentGroup, final Element processGroupElement, final StringEncryptor encryptor) throws ProcessorInstantiationException { + + // get the parent group ID + final String parentId = (parentGroup == null) ? null : parentGroup.getIdentifier(); + + // get the process group + final ProcessGroupDTO processGroupDto = FlowFromDOMFactory.getProcessGroup(parentId, processGroupElement, encryptor); + + // update the process group + if (parentId == null) { + + /* + * Labels are not included in the "inherit flow" algorithm, so we cannot + * blindly update them because they may not exist in the current flow. + * Therefore, we first remove all labels, and then let the updating + * process add labels defined in the new flow. + */ + final ProcessGroup root = controller.getGroup(controller.getRootGroupId()); + for (final Label label : root.findAllLabels()) { + label.getProcessGroup().removeLabel(label); + } + } + + // update the process group + controller.updateProcessGroup(processGroupDto); + + // get the real process group and ID + final ProcessGroup processGroup = controller.getGroup(processGroupDto.getId()); + + // processors & ports cannot be updated - they must be the same. Except for the scheduled state. + final List<Element> processorNodeList = getChildrenByTagName(processGroupElement, "processor"); + for (final Element processorElement : processorNodeList) { + final ProcessorDTO dto = FlowFromDOMFactory.getProcessor(processorElement, encryptor); + final ProcessorNode procNode = processGroup.getProcessor(dto.getId()); + + if (!procNode.getScheduledState().name().equals(dto.getState())) { + try { + switch (ScheduledState.valueOf(dto.getState())) { + case DISABLED: + // switch processor do disabled. This means we have to stop it (if it's already stopped, this method does nothing), + // and then we have to disable it. + procNode.getProcessGroup().stopProcessor(procNode); + procNode.getProcessGroup().disableProcessor(procNode); + break; + case RUNNING: + // we want to run now. Make sure processor is not disabled and then start it. + procNode.getProcessGroup().enableProcessor(procNode); + procNode.getProcessGroup().startProcessor(procNode); + break; + case STOPPED: + if (procNode.getScheduledState() == ScheduledState.DISABLED) { + procNode.getProcessGroup().enableProcessor(procNode); + } else if (procNode.getScheduledState() == ScheduledState.RUNNING) { + procNode.getProcessGroup().stopProcessor(procNode); + } + break; + } + } catch (final IllegalStateException ise) { + logger.error("Failed to change Scheduled State of {} from {} to {} due to {}", procNode, procNode.getScheduledState().name(), dto.getState(), ise.toString()); + logger.error("", ise); + + // create bulletin for the Processor Node + controller.getBulletinRepository().addBulletin(BulletinFactory.createBulletin(procNode, "Node Reconnection", Severity.ERROR.name(), + "Failed to change Scheduled State of " + procNode + " from " + procNode.getScheduledState().name() + " to " + dto.getState() + " due to " + ise.toString())); + + // create bulletin at Controller level. + controller.getBulletinRepository().addBulletin(BulletinFactory.createBulletin("Node Reconnection", Severity.ERROR.name(), + "Failed to change Scheduled State of " + procNode + " from " + procNode.getScheduledState().name() + " to " + dto.getState() + " due to " + ise.toString())); + } + } + } + + final List<Element> inputPortList = getChildrenByTagName(processGroupElement, "inputPort"); + for (final Element portElement : inputPortList) { + final PortDTO dto = FlowFromDOMFactory.getPort(portElement); + final Port port = processGroup.getInputPort(dto.getId()); + + if (!port.getScheduledState().name().equals(dto.getState())) { + switch (ScheduledState.valueOf(dto.getState())) { + case DISABLED: + // switch processor do disabled. This means we have to stop it (if it's already stopped, this method does nothing), + // and then we have to disable it. + port.getProcessGroup().stopInputPort(port); + port.getProcessGroup().disableInputPort(port); + break; + case RUNNING: + // we want to run now. Make sure processor is not disabled and then start it. + port.getProcessGroup().enableInputPort(port); + port.getProcessGroup().startInputPort(port); + break; + case STOPPED: + if (port.getScheduledState() == ScheduledState.DISABLED) { + port.getProcessGroup().enableInputPort(port); + } else if (port.getScheduledState() == ScheduledState.RUNNING) { + port.getProcessGroup().stopInputPort(port); + } + break; + } + } + } + + final List<Element> outputPortList = getChildrenByTagName(processGroupElement, "outputPort"); + for (final Element portElement : outputPortList) { + final PortDTO dto = FlowFromDOMFactory.getPort(portElement); + final Port port = processGroup.getOutputPort(dto.getId()); + + if (!port.getScheduledState().name().equals(dto.getState())) { + switch (ScheduledState.valueOf(dto.getState())) { + case DISABLED: + // switch processor do disabled. This means we have to stop it (if it's already stopped, this method does nothing), + // and then we have to disable it. + port.getProcessGroup().stopOutputPort(port); + port.getProcessGroup().disableOutputPort(port); + break; + case RUNNING: + // we want to run now. Make sure processor is not disabled and then start it. + port.getProcessGroup().enableOutputPort(port); + port.getProcessGroup().startOutputPort(port); + break; + case STOPPED: + if (port.getScheduledState() == ScheduledState.DISABLED) { + port.getProcessGroup().enableOutputPort(port); + } else if (port.getScheduledState() == ScheduledState.RUNNING) { + port.getProcessGroup().stopOutputPort(port); + } + break; + } + } + } + + // add labels + final List<Element> labelNodeList = getChildrenByTagName(processGroupElement, "label"); + for (final Element labelElement : labelNodeList) { + final LabelDTO labelDTO = FlowFromDOMFactory.getLabel(labelElement); + final Label label = controller.createLabel(labelDTO.getId(), labelDTO.getLabel()); + label.setStyle(labelDTO.getStyle()); + label.setPosition(new Position(labelDTO.getPosition().getX(), labelDTO.getPosition().getY())); + if (labelDTO.getWidth() != null && labelDTO.getHeight() != null) { + label.setSize(new Size(labelDTO.getWidth(), labelDTO.getHeight())); + } + + processGroup.addLabel(label); + } + + // update nested process groups (recursively) + final List<Element> nestedProcessGroupNodeList = getChildrenByTagName(processGroupElement, "processGroup"); + for (final Element nestedProcessGroupElement : nestedProcessGroupNodeList) { + updateProcessGroup(controller, processGroup, nestedProcessGroupElement, encryptor); + } + + // update connections + final List<Element> connectionNodeList = getChildrenByTagName(processGroupElement, "connection"); + for (final Element connectionElement : connectionNodeList) { + final ConnectionDTO dto = FlowFromDOMFactory.getConnection(connectionElement); + + final Connection connection = processGroup.getConnection(dto.getId()); + connection.setName(dto.getName()); + connection.setProcessGroup(processGroup); + + if (dto.getLabelIndex() != null) { + connection.setLabelIndex(dto.getLabelIndex()); + } + if (dto.getzIndex() != null) { + connection.setZIndex(dto.getzIndex()); + } + + final List<Position> bendPoints = new ArrayList<>(); + for (final PositionDTO bend : dto.getBends()) { + bendPoints.add(new Position(bend.getX(), bend.getY())); + } + connection.setBendPoints(bendPoints); + + List<FlowFilePrioritizer> newPrioritizers = null; + final List<String> prioritizers = dto.getPrioritizers(); + if (prioritizers != null) { + final List<String> newPrioritizersClasses = new ArrayList<>(prioritizers); + newPrioritizers = new ArrayList<>(); + for (final String className : newPrioritizersClasses) { + try { + newPrioritizers.add(controller.createPrioritizer(className)); + } catch (final ClassNotFoundException | InstantiationException | IllegalAccessException e) { + throw new IllegalArgumentException("Unable to set prioritizer " + className + ": " + e); + } + } + } + + if (newPrioritizers != null) { + connection.getFlowFileQueue().setPriorities(newPrioritizers); + } + + if (dto.getBackPressureObjectThreshold() != null) { + connection.getFlowFileQueue().setBackPressureObjectThreshold(dto.getBackPressureObjectThreshold()); + } + + if (dto.getBackPressureDataSizeThreshold() != null && !dto.getBackPressureDataSizeThreshold().trim().isEmpty()) { + connection.getFlowFileQueue().setBackPressureDataSizeThreshold(dto.getBackPressureDataSizeThreshold()); + } + + if (dto.getFlowFileExpiration() != null) { + connection.getFlowFileQueue().setFlowFileExpiration(dto.getFlowFileExpiration()); + } + } + + return processGroup; + } + + private Position toPosition(final PositionDTO dto) { + return new Position(dto.getX(), dto.getY()); + } + + private void updateProcessor(final ProcessorNode procNode, final ProcessorDTO processorDTO, final ProcessGroup processGroup, final FlowController controller) throws ProcessorInstantiationException { + final ProcessorConfigDTO config = processorDTO.getConfig(); + procNode.setPosition(toPosition(processorDTO.getPosition())); + procNode.setName(processorDTO.getName()); + procNode.setStyle(processorDTO.getStyle()); + procNode.setProcessGroup(processGroup); + procNode.setComments(config.getComments()); + procNode.setLossTolerant(config.isLossTolerant()); + procNode.setPenalizationPeriod(config.getPenaltyDuration()); + procNode.setYieldPeriod(config.getYieldDuration()); + procNode.setBulletinLevel(LogLevel.valueOf(config.getBulletinLevel())); + + if (config.getSchedulingStrategy() != null) { + procNode.setSchedulingStrategy(SchedulingStrategy.valueOf(config.getSchedulingStrategy())); + } + + // must set scheduling strategy before these two + procNode.setMaxConcurrentTasks(config.getConcurrentlySchedulableTaskCount()); + procNode.setScheduldingPeriod(config.getSchedulingPeriod()); + if (config.getRunDurationMillis() != null) { + procNode.setRunDuration(config.getRunDurationMillis(), TimeUnit.MILLISECONDS); + } + + procNode.setAnnotationData(config.getAnnotationData()); + + if (config.getAutoTerminatedRelationships() != null) { + final Set<Relationship> relationships = new HashSet<>(); + for (final String rel : config.getAutoTerminatedRelationships()) { + relationships.add(procNode.getRelationship(rel)); + } + procNode.setAutoTerminatedRelationships(relationships); + } + + for (final Map.Entry<String, String> entry : config.getProperties().entrySet()) { + if (entry.getValue() == null) { + procNode.removeProperty(entry.getKey()); + } else { + procNode.setProperty(entry.getKey(), entry.getValue()); + } + } + + final ScheduledState scheduledState = ScheduledState.valueOf(processorDTO.getState()); + if (ScheduledState.RUNNING.equals(scheduledState)) { + controller.startProcessor(processGroup.getIdentifier(), procNode.getIdentifier()); + } else if (ScheduledState.DISABLED.equals(scheduledState)) { + processGroup.disableProcessor(procNode); + } + } + + private ProcessGroup addProcessGroup(final FlowController controller, final ProcessGroup parentGroup, final Element processGroupElement, final StringEncryptor encryptor) throws ProcessorInstantiationException { + // get the parent group ID + final String parentId = (parentGroup == null) ? null : parentGroup.getIdentifier(); + + // add the process group + final ProcessGroupDTO processGroupDTO = FlowFromDOMFactory.getProcessGroup(parentId, processGroupElement, encryptor); + final ProcessGroup processGroup = controller.createProcessGroup(processGroupDTO.getId()); + processGroup.setComments(processGroupDTO.getComments()); + processGroup.setPosition(toPosition(processGroupDTO.getPosition())); + processGroup.setName(processGroupDTO.getName()); + processGroup.setParent(parentGroup); + if (parentGroup == null) { + controller.setRootGroup(processGroup); + } else { + parentGroup.addProcessGroup(processGroup); + } + + // add processors + final List<Element> processorNodeList = getChildrenByTagName(processGroupElement, "processor"); + for (final Element processorElement : processorNodeList) { + final ProcessorDTO processorDTO = FlowFromDOMFactory.getProcessor(processorElement, encryptor); + final ProcessorNode procNode = controller.createProcessor(processorDTO.getType(), processorDTO.getId()); + processGroup.addProcessor(procNode); + updateProcessor(procNode, processorDTO, processGroup, controller); + } + + // add input ports + final List<Element> inputPortNodeList = getChildrenByTagName(processGroupElement, "inputPort"); + for (final Element inputPortElement : inputPortNodeList) { + final PortDTO portDTO = FlowFromDOMFactory.getPort(inputPortElement); + + final Port port; + if (processGroup.isRootGroup()) { + port = controller.createRemoteInputPort(portDTO.getId(), portDTO.getName()); + } else { + port = controller.createLocalInputPort(portDTO.getId(), portDTO.getName()); + } + + port.setPosition(toPosition(portDTO.getPosition())); + port.setComments(portDTO.getComments()); + port.setProcessGroup(processGroup); + + final Set<String> userControls = portDTO.getUserAccessControl(); + if (userControls != null && !userControls.isEmpty()) { + if (!(port instanceof RootGroupPort)) { + throw new IllegalStateException("Attempting to add User Access Controls to " + port + ", but it is not a RootGroupPort"); + } + ((RootGroupPort) port).setUserAccessControl(userControls); + } + final Set<String> groupControls = portDTO.getGroupAccessControl(); + if (groupControls != null && !groupControls.isEmpty()) { + if (!(port instanceof RootGroupPort)) { + throw new IllegalStateException("Attempting to add Group Access Controls to " + port + ", but it is not a RootGroupPort"); + } + ((RootGroupPort) port).setGroupAccessControl(groupControls); + } + + processGroup.addInputPort(port); + if (portDTO.getConcurrentlySchedulableTaskCount() != null) { + port.setMaxConcurrentTasks(portDTO.getConcurrentlySchedulableTaskCount()); + } + + final ScheduledState scheduledState = ScheduledState.valueOf(portDTO.getState()); + if (ScheduledState.RUNNING.equals(scheduledState)) { + controller.startConnectable(port); + } else if (ScheduledState.DISABLED.equals(scheduledState)) { + processGroup.disableInputPort(port); + } + } + + // add output ports + final List<Element> outputPortNodeList = getChildrenByTagName(processGroupElement, "outputPort"); + for (final Element outputPortElement : outputPortNodeList) { + final PortDTO portDTO = FlowFromDOMFactory.getPort(outputPortElement); + + final Port port; + if (processGroup.isRootGroup()) { + port = controller.createRemoteOutputPort(portDTO.getId(), portDTO.getName()); + } else { + port = controller.createLocalOutputPort(portDTO.getId(), portDTO.getName()); + } + port.setPosition(toPosition(portDTO.getPosition())); + port.setComments(portDTO.getComments()); + port.setProcessGroup(processGroup); + + final Set<String> userControls = portDTO.getUserAccessControl(); + if (userControls != null && !userControls.isEmpty()) { + if (!(port instanceof RootGroupPort)) { + throw new IllegalStateException("Attempting to add User Access Controls to " + port + ", but it is not a RootGroupPort"); + } + ((RootGroupPort) port).setUserAccessControl(userControls); + } + final Set<String> groupControls = portDTO.getGroupAccessControl(); + if (groupControls != null && !groupControls.isEmpty()) { + if (!(port instanceof RootGroupPort)) { + throw new IllegalStateException("Attempting to add Group Access Controls to " + port + ", but it is not a RootGroupPort"); + } + ((RootGroupPort) port).setGroupAccessControl(groupControls); + } + + processGroup.addOutputPort(port); + if (portDTO.getConcurrentlySchedulableTaskCount() != null) { + port.setMaxConcurrentTasks(portDTO.getConcurrentlySchedulableTaskCount()); + } + + final ScheduledState scheduledState = ScheduledState.valueOf(portDTO.getState()); + if (ScheduledState.RUNNING.equals(scheduledState)) { + controller.startConnectable(port); + } else if (ScheduledState.DISABLED.equals(scheduledState)) { + processGroup.disableOutputPort(port); + } + } + + // add funnels + final List<Element> funnelNodeList = getChildrenByTagName(processGroupElement, "funnel"); + for (final Element funnelElement : funnelNodeList) { + final FunnelDTO funnelDTO = FlowFromDOMFactory.getFunnel(funnelElement); + final Funnel funnel = controller.createFunnel(funnelDTO.getId()); + funnel.setPosition(toPosition(funnelDTO.getPosition())); + processGroup.addFunnel(funnel); + controller.startConnectable(funnel); + } + + // add labels + final List<Element> labelNodeList = getChildrenByTagName(processGroupElement, "label"); + for (final Element labelElement : labelNodeList) { + final LabelDTO labelDTO = FlowFromDOMFactory.getLabel(labelElement); + final Label label = controller.createLabel(labelDTO.getId(), labelDTO.getLabel()); + label.setStyle(labelDTO.getStyle()); + + label.setPosition(toPosition(labelDTO.getPosition())); + label.setSize(new Size(labelDTO.getWidth(), labelDTO.getHeight())); + processGroup.addLabel(label); + } + + // add nested process groups (recursively) + final List<Element> nestedProcessGroupNodeList = getChildrenByTagName(processGroupElement, "processGroup"); + for (final Element nestedProcessGroupElement : nestedProcessGroupNodeList) { + addProcessGroup(controller, processGroup, nestedProcessGroupElement, encryptor); + } + + // add remote process group + final List<Element> remoteProcessGroupNodeList = getChildrenByTagName(processGroupElement, "remoteProcessGroup"); + for (final Element remoteProcessGroupElement : remoteProcessGroupNodeList) { + final RemoteProcessGroupDTO remoteGroupDto = FlowFromDOMFactory.getRemoteProcessGroup(remoteProcessGroupElement); + final RemoteProcessGroup remoteGroup = controller.createRemoteProcessGroup(remoteGroupDto.getId(), remoteGroupDto.getTargetUri()); + remoteGroup.setComments(remoteGroupDto.getComments()); + remoteGroup.setPosition(toPosition(remoteGroupDto.getPosition())); + final String name = remoteGroupDto.getName(); + if (name != null && !name.trim().isEmpty()) { + remoteGroup.setName(name); + } + remoteGroup.setProcessGroup(processGroup); + remoteGroup.setCommunicationsTimeout(remoteGroupDto.getCommunicationsTimeout()); + + if (remoteGroupDto.getYieldDuration() != null) { + remoteGroup.setYieldDuration(remoteGroupDto.getYieldDuration()); + } + + final Set<RemoteProcessGroupPortDescriptor> inputPorts = new HashSet<>(); + for (final Element portElement : getChildrenByTagName(remoteProcessGroupElement, "inputPort")) { + inputPorts.add(FlowFromDOMFactory.getRemoteProcessGroupPort(portElement)); + } + remoteGroup.setInputPorts(inputPorts); + + final Set<RemoteProcessGroupPortDescriptor> outputPorts = new HashSet<>(); + for (final Element portElement : getChildrenByTagName(remoteProcessGroupElement, "outputPort")) { + outputPorts.add(FlowFromDOMFactory.getRemoteProcessGroupPort(portElement)); + } + remoteGroup.setOutputPorts(outputPorts); + processGroup.addRemoteProcessGroup(remoteGroup); + + for (final RemoteProcessGroupPortDescriptor remoteGroupPortDTO : outputPorts) { + final RemoteGroupPort port = remoteGroup.getOutputPort(remoteGroupPortDTO.getId()); + if (Boolean.TRUE.equals(remoteGroupPortDTO.isTransmitting())) { + controller.startTransmitting(port); + } + } + for (final RemoteProcessGroupPortDescriptor remoteGroupPortDTO : inputPorts) { + final RemoteGroupPort port = remoteGroup.getInputPort(remoteGroupPortDTO.getId()); + if (Boolean.TRUE.equals(remoteGroupPortDTO.isTransmitting())) { + controller.startTransmitting(port); + } + } + } + + // add connections + final List<Element> connectionNodeList = getChildrenByTagName(processGroupElement, "connection"); + for (final Element connectionElement : connectionNodeList) { + final ConnectionDTO dto = FlowFromDOMFactory.getConnection(connectionElement); + + final Connectable source; + final ConnectableDTO sourceDto = dto.getSource(); + if (ConnectableType.REMOTE_OUTPUT_PORT.name().equals(sourceDto.getType())) { + final RemoteProcessGroup remoteGroup = processGroup.getRemoteProcessGroup(sourceDto.getGroupId()); + source = remoteGroup.getOutputPort(sourceDto.getId()); + } else { + final ProcessGroup sourceGroup = controller.getGroup(sourceDto.getGroupId()); + if (sourceGroup == null) { + throw new RuntimeException("Found Invalid ProcessGroup ID for Source: " + dto.getSource().getGroupId()); + } + + source = sourceGroup.getConnectable(sourceDto.getId()); + } + if (source == null) { + throw new RuntimeException("Found Invalid Connectable ID for Source: " + dto.getSource().getId()); + } + + final Connectable destination; + final ConnectableDTO destinationDto = dto.getDestination(); + if (ConnectableType.REMOTE_INPUT_PORT.name().equals(destinationDto.getType())) { + final RemoteProcessGroup remoteGroup = processGroup.getRemoteProcessGroup(destinationDto.getGroupId()); + destination = remoteGroup.getInputPort(destinationDto.getId()); + } else { + final ProcessGroup destinationGroup = controller.getGroup(destinationDto.getGroupId()); + if (destinationGroup == null) { + throw new RuntimeException("Found Invalid ProcessGroup ID for Destination: " + dto.getDestination().getGroupId()); + } + + destination = destinationGroup.getConnectable(destinationDto.getId()); + } + if (destination == null) { + throw new RuntimeException("Found Invalid Connectable ID for Destination: " + dto.getDestination().getId()); + } + + final Connection connection = controller.createConnection(dto.getId(), dto.getName(), source, destination, dto.getSelectedRelationships()); + connection.setProcessGroup(processGroup); + + final List<Position> bendPoints = new ArrayList<>(); + for (final PositionDTO bend : dto.getBends()) { + bendPoints.add(new Position(bend.getX(), bend.getY())); + } + connection.setBendPoints(bendPoints); + + final Long zIndex = dto.getzIndex(); + if (zIndex != null) { + connection.setZIndex(zIndex); + } + + if (dto.getLabelIndex() != null) { + connection.setLabelIndex(dto.getLabelIndex()); + } + + List<FlowFilePrioritizer> newPrioritizers = null; + final List<String> prioritizers = dto.getPrioritizers(); + if (prioritizers != null) { + final List<String> newPrioritizersClasses = new ArrayList<>(prioritizers); + newPrioritizers = new ArrayList<>(); + for (final String className : newPrioritizersClasses) { + try { + newPrioritizers.add(controller.createPrioritizer(className)); + } catch (final ClassNotFoundException | InstantiationException | IllegalAccessException e) { + throw new IllegalArgumentException("Unable to set prioritizer " + className + ": " + e); + } + } + } + if (newPrioritizers != null) { + connection.getFlowFileQueue().setPriorities(newPrioritizers); + } + + if (dto.getBackPressureObjectThreshold() != null) { + connection.getFlowFileQueue().setBackPressureObjectThreshold(dto.getBackPressureObjectThreshold()); + } + if (dto.getBackPressureDataSizeThreshold() != null) { + connection.getFlowFileQueue().setBackPressureDataSizeThreshold(dto.getBackPressureDataSizeThreshold()); + } + if (dto.getFlowFileExpiration() != null) { + connection.getFlowFileQueue().setFlowFileExpiration(dto.getFlowFileExpiration()); + } + + processGroup.addConnection(connection); + } + + return processGroup; + } + + /** + * Returns true if the given controller can inherit the proposed flow + * without orphaning flow files. + * + * @param existingFlow + * @param controller the running controller + * @param proposedFlow the flow to inherit + * + * @return null if the controller can inherit the specified flow, an + * explanation of why it cannot be inherited otherwise + * + * @throws FingerprintException if flow fingerprints could not be generated + */ + public String checkFlowInheritability(final DataFlow existingFlow, final DataFlow proposedFlow, final FlowController controller) throws FingerprintException { + if (existingFlow == null) { + return null; // no existing flow, so equivalent to proposed flow + } + + return checkFlowInheritability(existingFlow.getFlow(), proposedFlow.getFlow(), controller); + } + + private String checkFlowInheritability(final byte[] existingFlow, final byte[] proposedFlow, final FlowController controller) { + if (existingFlow == null) { + return null; // no existing flow, so equivalent to proposed flow + } + + // check if the Flow is inheritable + final FingerprintFactory fingerprintFactory = new FingerprintFactory(encryptor); + final String existingFlowFingerprintBeforeHash = fingerprintFactory.createFingerprint(existingFlow, controller); + if (existingFlowFingerprintBeforeHash.trim().isEmpty()) { + return null; // no existing flow, so equivalent to proposed flow + } + + if (proposedFlow == null || proposedFlow.length == 0) { + return "Proposed Flow was empty but Current Flow is not"; // existing flow is not empty and proposed flow is empty (we could orphan flowfiles) + } + + final String proposedFlowFingerprintBeforeHash = fingerprintFactory.createFingerprint(proposedFlow, controller); + if (proposedFlowFingerprintBeforeHash.trim().isEmpty()) { + return "Proposed Flow was empty but Current Flow is not"; // existing flow is not empty and proposed flow is empty (we could orphan flowfiles) + } + + final boolean inheritable = existingFlowFingerprintBeforeHash.equals(proposedFlowFingerprintBeforeHash); + if (!inheritable) { + return findFirstDiscrepancy(existingFlowFingerprintBeforeHash, proposedFlowFingerprintBeforeHash, "Flows"); + } + + return null; + } + + /** + * Returns true if the given controller can inherit the proposed flow + * without orphaning flow files. + * + * @param existingFlow + * @param proposedFlow the flow to inherit + * + * @return null if the controller can inherit the specified flow, an + * explanation of why it cannot be inherited otherwise + * + * @throws FingerprintException if flow fingerprints could not be generated + */ + public String checkTemplateInheritability(final DataFlow existingFlow, final DataFlow proposedFlow) throws FingerprintException { + if (existingFlow == null) { + return null; // no existing flow, so equivalent to proposed flow + } + + // check if the Flow is inheritable + final FingerprintFactory fingerprintFactory = new FingerprintFactory(encryptor); + // check if the Templates are inheritable + final byte[] existingTemplateBytes = existingFlow.getTemplates(); + if (existingTemplateBytes == null || existingTemplateBytes.length == 0) { + return null; + } + + final List<Template> existingTemplates = TemplateManager.parseBytes(existingTemplateBytes); + final String existingTemplateFingerprint = fingerprintFactory.createFingerprint(existingTemplates); + if (existingTemplateFingerprint.trim().isEmpty()) { + return null; + } + + final byte[] proposedTemplateBytes = proposedFlow.getTemplates(); + if (proposedTemplateBytes == null || proposedTemplateBytes.length == 0) { + return "Proposed Flow does not contain any Templates but Current Flow does"; + } + + final List<Template> proposedTemplates = TemplateManager.parseBytes(proposedTemplateBytes); + final String proposedTemplateFingerprint = fingerprintFactory.createFingerprint(proposedTemplates); + if (proposedTemplateFingerprint.trim().isEmpty()) { + return "Proposed Flow does not contain any Templates but Current Flow does"; + } + + try { + final String existingTemplateMd5 = fingerprintFactory.md5Hash(existingTemplateFingerprint); + final String proposedTemplateMd5 = fingerprintFactory.md5Hash(proposedTemplateFingerprint); + + if (!existingTemplateMd5.equals(proposedTemplateMd5)) { + return findFirstDiscrepancy(existingTemplateFingerprint, proposedTemplateFingerprint, "Templates"); + } + } catch (final NoSuchAlgorithmException e) { + throw new FingerprintException(e); + } + + return null; + } + + private String findFirstDiscrepancy(final String existing, final String proposed, final String comparisonDescription) { + final int shortestFileLength = Math.min(existing.length(), proposed.length()); + for (int i = 0; i < shortestFileLength; i++) { + if (existing.charAt(i) != proposed.charAt(i)) { + final String formattedExistingDelta = formatFlowDiscrepancy(existing, i, 100); + final String formattedProposedDelta = formatFlowDiscrepancy(proposed, i, 100); + return String.format("Found difference in %s:\nLocal Fingerprint: %s\nCluster Fingerprint: %s", comparisonDescription, formattedExistingDelta, formattedProposedDelta); + } + } + + // existing must startWith proposed or proposed must startWith existing + if (existing.length() > proposed.length()) { + final String formattedExistingDelta = existing.substring(proposed.length(), Math.min(existing.length(), proposed.length() + 200)); + return String.format("Found difference in %s:\nLocal Fingerprint contains additional configuration from Cluster Fingerprint: %s", comparisonDescription, formattedExistingDelta); + } else if (proposed.length() > existing.length()) { + final String formattedProposedDelta = proposed.substring(existing.length(), Math.min(proposed.length(), existing.length() + 200)); + return String.format("Found difference in %s:\nCluster Fingerprint contains additional configuration from Local Fingerprint: %s", comparisonDescription, formattedProposedDelta); + } + + return "Unable to find any discrepancies between fingerprints. Please contact the NiFi support team"; + } + + private byte[] toBytes(final FlowController flowController) throws FlowSerializationException { + final ByteArrayOutputStream result = new ByteArrayOutputStream(); + final StandardFlowSerializer flowSerializer = new StandardFlowSerializer(encryptor); + flowController.serialize(flowSerializer, result); + return result.toByteArray(); + } + + private static String getString(final Element element, final String childElementName) { + final List<Element> nodeList = getChildrenByTagName(element, childElementName); + if (nodeList == null || nodeList.isEmpty()) { + return ""; + } + final Element childElement = nodeList.get(0); + return childElement.getTextContent(); + } + + private static int getInt(final Element element, final String childElementName) { + return Integer.parseInt(getString(element, childElementName)); + } + + private static Integer getInteger(final Element element, final String childElementName) { + final String value = getString(element, childElementName); + return (value == null || value.trim().equals("") ? null : Integer.parseInt(value)); + } + + private static List<Element> getChildrenByTagName(final Element element, final String tagName) { + final List<Element> matches = new ArrayList<>(); + final NodeList nodeList = element.getChildNodes(); + for (int i = 0; i < nodeList.getLength(); i++) { + final Node node = nodeList.item(i); + if (!(node instanceof Element)) { + continue; + } + + final Element child = (Element) nodeList.item(i); + if (child.getNodeName().equals(tagName)) { + matches.add(child); + } + } + + return matches; + } + + private String formatFlowDiscrepancy(final String flowFingerprint, final int deltaIndex, final int deltaPad) { + return flowFingerprint.substring(Math.max(0, deltaIndex - deltaPad), Math.min(flowFingerprint.length(), deltaIndex + deltaPad)); + } +}