http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowFromDOMFactory.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowFromDOMFactory.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowFromDOMFactory.java new file mode 100644 index 0000000..c67181a --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowFromDOMFactory.java @@ -0,0 +1,418 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.connectable.Size; +import org.apache.nifi.encrypt.StringEncryptor; +import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor; +import org.apache.nifi.remote.StandardRemoteProcessGroupPortDescriptor; +import org.apache.nifi.scheduling.SchedulingStrategy; +import org.apache.nifi.util.DomUtils; +import org.apache.nifi.web.api.dto.ConnectableDTO; +import org.apache.nifi.web.api.dto.ConnectionDTO; +import org.apache.nifi.web.api.dto.FlowSnippetDTO; +import org.apache.nifi.web.api.dto.FunnelDTO; +import org.apache.nifi.web.api.dto.LabelDTO; +import org.apache.nifi.web.api.dto.PortDTO; +import org.apache.nifi.web.api.dto.PositionDTO; +import org.apache.nifi.web.api.dto.ProcessGroupDTO; +import org.apache.nifi.web.api.dto.ProcessorConfigDTO; +import org.apache.nifi.web.api.dto.ProcessorDTO; +import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; + +import org.w3c.dom.Element; +import org.w3c.dom.NodeList; + +public class FlowFromDOMFactory { + + public static PositionDTO getPosition(final Element positionElement) { + if (positionElement == null) { + throw new IllegalArgumentException("Invalid Flow: Found no 'position' element"); + } + return new PositionDTO(Double.parseDouble(positionElement.getAttribute("x")), Double.parseDouble(positionElement.getAttribute("y"))); + } + + public static Size getSize(final Element sizeElement) { + if (sizeElement == null) { + throw new IllegalArgumentException("Invalid Flow: Found no 'size' element"); + } + + return new Size(Double.parseDouble(sizeElement.getAttribute("width")), Double.parseDouble(sizeElement.getAttribute("height"))); + } + + public static Map<String, String> getStyle(final Element stylesElement) { + final Map<String, String> styles = new HashMap<>(); + if (stylesElement == null) { + return styles; + } + + for (final Element styleElement : getChildrenByTagName(stylesElement, "style")) { + final String styleName = styleElement.getAttribute("name"); + final String styleValue = styleElement.getTextContent(); + styles.put(styleName, styleValue); + } + + return styles; + } + + public static ProcessGroupDTO getProcessGroup(final String parentId, final Element element, final StringEncryptor encryptor) { + final ProcessGroupDTO dto = new ProcessGroupDTO(); + final String groupId = getString(element, "id"); + dto.setId(groupId); + dto.setParentGroupId(parentId); + dto.setName(getString(element, "name")); + dto.setPosition(getPosition(DomUtils.getChild(element, "position"))); + dto.setComments(getString(element, "comment")); + + final Set<ProcessorDTO> processors = new HashSet<>(); + final Set<ConnectionDTO> connections = new HashSet<>(); + final Set<FunnelDTO> funnels = new HashSet<>(); + final Set<PortDTO> inputPorts = new HashSet<>(); + final Set<PortDTO> outputPorts = new HashSet<>(); + final Set<LabelDTO> labels = new HashSet<>(); + final Set<ProcessGroupDTO> processGroups = new HashSet<>(); + final Set<RemoteProcessGroupDTO> remoteProcessGroups = new HashSet<>(); + + final FlowSnippetDTO groupContents = new FlowSnippetDTO(); + groupContents.setConnections(connections); + groupContents.setFunnels(funnels); + groupContents.setInputPorts(inputPorts); + groupContents.setLabels(labels); + groupContents.setOutputPorts(outputPorts); + groupContents.setProcessGroups(processGroups); + groupContents.setProcessors(processors); + groupContents.setRemoteProcessGroups(remoteProcessGroups); + + NodeList nodeList = DomUtils.getChildNodesByTagName(element, "processor"); + for (int i = 0; i < nodeList.getLength(); i++) { + processors.add(getProcessor((Element) nodeList.item(i), encryptor)); + } + + nodeList = DomUtils.getChildNodesByTagName(element, "funnel"); + for (int i = 0; i < nodeList.getLength(); i++) { + funnels.add(getFunnel((Element) nodeList.item(i))); + } + + nodeList = DomUtils.getChildNodesByTagName(element, "inputPort"); + for (int i = 0; i < nodeList.getLength(); i++) { + inputPorts.add(getPort((Element) nodeList.item(i))); + } + + nodeList = DomUtils.getChildNodesByTagName(element, "outputPort"); + for (int i = 0; i < nodeList.getLength(); i++) { + outputPorts.add(getPort((Element) nodeList.item(i))); + } + + nodeList = DomUtils.getChildNodesByTagName(element, "label"); + for (int i = 0; i < nodeList.getLength(); i++) { + labels.add(getLabel((Element) nodeList.item(i))); + } + + nodeList = DomUtils.getChildNodesByTagName(element, "processGroup"); + for (int i = 0; i < nodeList.getLength(); i++) { + processGroups.add(getProcessGroup(groupId, (Element) nodeList.item(i), encryptor)); + } + + nodeList = DomUtils.getChildNodesByTagName(element, "remoteProcessGroup"); + for (int i = 0; i < nodeList.getLength(); i++) { + remoteProcessGroups.add(getRemoteProcessGroup((Element) nodeList.item(i))); + } + + nodeList = DomUtils.getChildNodesByTagName(element, "connection"); + for (int i = 0; i < nodeList.getLength(); i++) { + connections.add(getConnection((Element) nodeList.item(i))); + } + + dto.setContents(groupContents); + return dto; + } + + public static ConnectionDTO getConnection(final Element element) { + final ConnectionDTO dto = new ConnectionDTO(); + dto.setId(getString(element, "id")); + dto.setName(getString(element, "name")); + dto.setLabelIndex(getOptionalInt(element, "labelIndex")); + dto.setzIndex(getOptionalLong(element, "zIndex")); + + final List<PositionDTO> bends = new ArrayList<>(); + final Element bendPointsElement = DomUtils.getChild(element, "bendPoints"); + if (bendPointsElement != null) { + for (final Element bendPointElement : getChildrenByTagName(bendPointsElement, "bendPoint")) { + final PositionDTO bend = getPosition(bendPointElement); + bends.add(bend); + } + } + dto.setBends(bends); + + final ConnectableDTO sourceConnectable = new ConnectableDTO(); + dto.setSource(sourceConnectable); + sourceConnectable.setId(getString(element, "sourceId")); + sourceConnectable.setGroupId(getString(element, "sourceGroupId")); + sourceConnectable.setType(getString(element, "sourceType")); + + final ConnectableDTO destConnectable = new ConnectableDTO(); + dto.setDestination(destConnectable); + destConnectable.setId(getString(element, "destinationId")); + destConnectable.setGroupId(getString(element, "destinationGroupId")); + destConnectable.setType(getString(element, "destinationType")); + + final Set<String> relationships = new HashSet<>(); + final List<Element> relationshipNodeList = getChildrenByTagName(element, "relationship"); + for (final Element relationshipElem : relationshipNodeList) { + relationships.add(relationshipElem.getTextContent()); + } + dto.setSelectedRelationships(relationships); + + dto.setBackPressureObjectThreshold(getLong(element, "maxWorkQueueSize")); + + final String maxDataSize = getString(element, "maxWorkQueueDataSize"); + if (maxDataSize != null && !maxDataSize.trim().isEmpty()) { + dto.setBackPressureDataSizeThreshold(maxDataSize); + } + + String expiration = getString(element, "flowFileExpiration"); + if (expiration == null) { + expiration = "0 sec"; + } + dto.setFlowFileExpiration(expiration); + + final List<String> prioritizerClasses = new ArrayList<>(); + final List<Element> prioritizerNodeList = getChildrenByTagName(element, "queuePrioritizerClass"); + for (final Element prioritizerElement : prioritizerNodeList) { + prioritizerClasses.add(prioritizerElement.getTextContent().trim()); + } + dto.setPrioritizers(prioritizerClasses); + + return dto; + } + + public static RemoteProcessGroupDTO getRemoteProcessGroup(final Element element) { + final RemoteProcessGroupDTO dto = new RemoteProcessGroupDTO(); + dto.setId(getString(element, "id")); + dto.setName(getString(element, "name")); + dto.setTargetUri(getString(element, "url")); + dto.setTransmitting(getBoolean(element, "transmitting")); + dto.setPosition(getPosition(DomUtils.getChild(element, "position"))); + dto.setCommunicationsTimeout(getString(element, "timeout")); + dto.setComments(getString(element, "comment")); + + return dto; + } + + public static LabelDTO getLabel(final Element element) { + final LabelDTO dto = new LabelDTO(); + dto.setId(getString(element, "id")); + dto.setLabel(getString(element, "value")); + dto.setPosition(getPosition(DomUtils.getChild(element, "position"))); + final Size size = getSize(DomUtils.getChild(element, "size")); + dto.setWidth(size.getWidth()); + dto.setHeight(size.getHeight()); + dto.setStyle(getStyle(DomUtils.getChild(element, "styles"))); + + return dto; + } + + public static FunnelDTO getFunnel(final Element element) { + final FunnelDTO dto = new FunnelDTO(); + dto.setId(getString(element, "id")); + dto.setPosition(getPosition(DomUtils.getChild(element, "position"))); + + return dto; + } + + public static PortDTO getPort(final Element element) { + final PortDTO portDTO = new PortDTO(); + portDTO.setId(getString(element, "id")); + portDTO.setPosition(getPosition(DomUtils.getChild(element, "position"))); + portDTO.setName(getString(element, "name")); + portDTO.setComments(getString(element, "comments")); + final ScheduledState scheduledState = getScheduledState(element); + portDTO.setState(scheduledState.toString()); + + final List<Element> maxTasksElements = getChildrenByTagName(element, "maxConcurrentTasks"); + if (!maxTasksElements.isEmpty()) { + portDTO.setConcurrentlySchedulableTaskCount(Integer.parseInt(maxTasksElements.get(0).getTextContent())); + } + + final List<Element> userAccessControls = getChildrenByTagName(element, "userAccessControl"); + if (userAccessControls != null && !userAccessControls.isEmpty()) { + final Set<String> users = new HashSet<>(); + portDTO.setUserAccessControl(users); + for (final Element userElement : userAccessControls) { + users.add(userElement.getTextContent()); + } + } + + final List<Element> groupAccessControls = getChildrenByTagName(element, "groupAccessControl"); + if (groupAccessControls != null && !groupAccessControls.isEmpty()) { + final Set<String> groups = new HashSet<>(); + portDTO.setGroupAccessControl(groups); + for (final Element groupElement : groupAccessControls) { + groups.add(groupElement.getTextContent()); + } + } + + return portDTO; + } + + public static RemoteProcessGroupPortDescriptor getRemoteProcessGroupPort(final Element element) { + final StandardRemoteProcessGroupPortDescriptor descriptor = new StandardRemoteProcessGroupPortDescriptor(); + + // What we have serialized is the ID of the Remote Process Group, followed by a dash ('-'), followed by + // the actual ID of the port; we want to get rid of the remote process group id. + String id = getString(element, "id"); + if (id.length() > 37) { + id = id.substring(37); + } + + descriptor.setId(id); + descriptor.setName(getString(element, "name")); + descriptor.setComments(getString(element, "comments")); + descriptor.setConcurrentlySchedulableTaskCount(getInt(element, "maxConcurrentTasks")); + descriptor.setUseCompression(getBoolean(element, "useCompression")); + descriptor.setTransmitting("RUNNING".equalsIgnoreCase(getString(element, "scheduledState"))); + + return descriptor; + } + + public static ProcessorDTO getProcessor(final Element element, final StringEncryptor encryptor) { + final ProcessorDTO dto = new ProcessorDTO(); + + dto.setId(getString(element, "id")); + dto.setName(getString(element, "name")); + dto.setType(getString(element, "class")); + dto.setPosition(getPosition(DomUtils.getChild(element, "position"))); + dto.setStyle(getStyle(DomUtils.getChild(element, "styles"))); + + final ProcessorConfigDTO configDto = new ProcessorConfigDTO(); + dto.setConfig(configDto); + configDto.setComments(getString(element, "comment")); + configDto.setAnnotationData(getString(element, "annotationData")); + configDto.setConcurrentlySchedulableTaskCount(getInt(element, "maxConcurrentTasks")); + final String schedulingPeriod = getString(element, "schedulingPeriod"); + configDto.setSchedulingPeriod(schedulingPeriod); + configDto.setPenaltyDuration(getString(element, "penalizationPeriod")); + configDto.setYieldDuration(getString(element, "yieldPeriod")); + configDto.setBulletinLevel(getString(element, "bulletinLevel")); + configDto.setLossTolerant(getBoolean(element, "lossTolerant")); + final ScheduledState scheduledState = getScheduledState(element); + dto.setState(scheduledState.toString()); + + // handle scheduling strategy + final String schedulingStrategyName = getString(element, "schedulingStrategy"); + if (schedulingStrategyName == null || schedulingStrategyName.trim().isEmpty()) { + configDto.setSchedulingStrategy(SchedulingStrategy.TIMER_DRIVEN.name()); + } else { + configDto.setSchedulingStrategy(schedulingStrategyName.trim()); + } + + final Long runDurationNanos = getOptionalLong(element, "runDurationNanos"); + if (runDurationNanos != null) { + configDto.setRunDurationMillis(TimeUnit.NANOSECONDS.toMillis(runDurationNanos)); + } + + final LinkedHashMap<String, String> properties = new LinkedHashMap<>(); + final List<Element> propertyNodeList = getChildrenByTagName(element, "property"); + for (final Element propertyElement : propertyNodeList) { + final String name = getString(propertyElement, "name"); + final String value = decrypt(getString(propertyElement, "value"), encryptor); + properties.put(name, value); + } + configDto.setProperties(properties); + + final Set<String> autoTerminatedRelationships = new HashSet<>(); + final List<Element> autoTerminateList = getChildrenByTagName(element, "autoTerminatedRelationship"); + for (final Element autoTerminateElement : autoTerminateList) { + autoTerminatedRelationships.add(autoTerminateElement.getTextContent()); + } + configDto.setAutoTerminatedRelationships(autoTerminatedRelationships); + + return dto; + } + + private static String getString(final Element element, final String childElementName) { + final List<Element> nodeList = getChildrenByTagName(element, childElementName); + if (nodeList == null || nodeList.isEmpty()) { + return null; + } + final Element childElement = nodeList.get(0); + return childElement.getTextContent(); + } + + private static Integer getOptionalInt(final Element element, final String childElementName) { + final List<Element> nodeList = getChildrenByTagName(element, childElementName); + if (nodeList == null || nodeList.isEmpty()) { + return null; + } + final Element childElement = nodeList.get(0); + final String val = childElement.getTextContent(); + if (val == null) { + return null; + } + return Integer.parseInt(val); + } + + private static Long getOptionalLong(final Element element, final String childElementName) { + final List<Element> nodeList = getChildrenByTagName(element, childElementName); + if (nodeList == null || nodeList.isEmpty()) { + return null; + } + final Element childElement = nodeList.get(0); + final String val = childElement.getTextContent(); + if (val == null) { + return null; + } + return Long.parseLong(val); + } + + private static int getInt(final Element element, final String childElementName) { + return Integer.parseInt(getString(element, childElementName)); + } + + private static long getLong(final Element element, final String childElementName) { + return Long.parseLong(getString(element, childElementName)); + } + + private static boolean getBoolean(final Element element, final String childElementName) { + return Boolean.parseBoolean(getString(element, childElementName)); + } + + private static ScheduledState getScheduledState(final Element element) { + return ScheduledState.valueOf(getString(element, "scheduledState")); + } + + private static List<Element> getChildrenByTagName(final Element element, final String childElementName) { + return DomUtils.getChildElementsByTagName(element, childElementName); + } + + private static String decrypt(final String value, final StringEncryptor encryptor) { + if (value != null && value.startsWith(FlowSerializer.ENC_PREFIX) && value.endsWith(FlowSerializer.ENC_SUFFIX)) { + return encryptor.decrypt(value.substring(FlowSerializer.ENC_PREFIX.length(), value.length() - FlowSerializer.ENC_SUFFIX.length())); + } else { + return value; + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSerializationException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSerializationException.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSerializationException.java new file mode 100644 index 0000000..f1ee760 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSerializationException.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller; + +/** + * Represents the exceptional case when flow configuration is malformed and + * therefore, cannot be serialized or deserialized. + * + * @author unattributed + */ +public class FlowSerializationException extends RuntimeException { + + private static final long serialVersionUID = 128934798237L; + + public FlowSerializationException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } + + public FlowSerializationException(Throwable cause) { + super(cause); + } + + public FlowSerializationException(String message, Throwable cause) { + super(message, cause); + } + + public FlowSerializationException(String message) { + super(message); + } + + public FlowSerializationException() { + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSerializer.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSerializer.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSerializer.java new file mode 100644 index 0000000..331b26c --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSerializer.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller; + +import java.io.OutputStream; + +/** + * Serializes the flow configuration of a controller instance to an output + * stream. + * + * @author unattributed + */ +public interface FlowSerializer { + + public static final String ENC_PREFIX = "enc{"; + public static final String ENC_SUFFIX = "}"; + + /** + * Serializes the flow configuration of a controller instance. + * + * @param controller a controller + * @param os an output stream to write the configuration to + * + * @throws FlowSerializationException if serialization failed + */ + void serialize(FlowController controller, OutputStream os) throws FlowSerializationException; + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSynchronizationException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSynchronizationException.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSynchronizationException.java new file mode 100644 index 0000000..706ac46 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSynchronizationException.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller; + +/** + * Represents the exceptional case when a controller managing an existing flow + * fails to fully load a different flow. + * + * @author unattributed + */ +public class FlowSynchronizationException extends RuntimeException { + + private static final long serialVersionUID = 109234802938L; + + public FlowSynchronizationException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } + + public FlowSynchronizationException(Throwable cause) { + super(cause); + } + + public FlowSynchronizationException(String message, Throwable cause) { + super(message, cause); + } + + public FlowSynchronizationException(String message) { + super(message); + } + + public FlowSynchronizationException() { + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSynchronizer.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSynchronizer.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSynchronizer.java new file mode 100644 index 0000000..f6889fe --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSynchronizer.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller; + +import org.apache.nifi.cluster.protocol.DataFlow; +import org.apache.nifi.encrypt.StringEncryptor; + +/** + * @author unattributed + */ +public interface FlowSynchronizer { + + /** + * Synchronizes the given controller with the given flow configuration. If + * loading the proposed flow configuration would cause the controller to + * orphan flow files, then an UninheritableFlowException is thrown. + * + * If the FlowSynchronizationException is thrown, then the controller may + * have changed some of its state and should no longer be used. + * + * @param controller the flow controller + * @param dataFlow the flow to load the controller with. If the flow is null + * or zero length, then the controller must not have a flow or else an + * UninheritableFlowException will be thrown. + * @param encryptor used for the encryption/decryption of sensitive property + * values + * + * @throws FlowSerializationException if proposed flow is not a valid flow + * configuration file + * @throws UninheritableFlowException if the proposed flow cannot be loaded + * by the controller because in doing so would risk orphaning flow files + * @throws FlowSynchronizationException if updates to the controller failed. + * If this exception is thrown, then the controller should be considered + * unsafe to be used + */ + void sync(FlowController controller, DataFlow dataFlow, StringEncryptor encryptor) + throws FlowSerializationException, UninheritableFlowException, FlowSynchronizationException; + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowUnmarshaller.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowUnmarshaller.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowUnmarshaller.java new file mode 100644 index 0000000..fa33b49 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowUnmarshaller.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; + +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; + +import org.apache.nifi.encrypt.StringEncryptor; +import org.apache.nifi.io.ByteArrayInputStream; +import org.apache.nifi.web.api.dto.FlowSnippetDTO; +import org.apache.nifi.web.api.dto.ProcessGroupDTO; + +import org.w3c.dom.Document; +import org.w3c.dom.Element; +import org.w3c.dom.NodeList; +import org.xml.sax.SAXException; + +public class FlowUnmarshaller { + + /** + * Interprets the given byte array as an XML document that conforms to the + * Flow Configuration schema and returns a FlowSnippetDTO representing the + * flow + * + * @param flowContents + * @param encryptor + * @return + * @throws NullPointerException if <code>flowContents</code> is null + * @throws IOException + * @throws SAXException + * @throws ParserConfigurationException + */ + public static FlowSnippetDTO unmarshal(final byte[] flowContents, final StringEncryptor encryptor) throws IOException, SAXException, ParserConfigurationException { + if (Objects.requireNonNull(flowContents).length == 0) { + return new FlowSnippetDTO(); + } + + final DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance(); + final DocumentBuilder docBuilder = dbf.newDocumentBuilder(); + + final Document document = docBuilder.parse(new ByteArrayInputStream(flowContents)); + final FlowSnippetDTO flowDto = new FlowSnippetDTO(); + + final NodeList nodeList = document.getElementsByTagName("rootGroup"); + if (nodeList.getLength() == 0) { + return flowDto; + } + if (nodeList.getLength() > 1) { + throw new IllegalArgumentException("Contents contain multiple rootGroup elements"); + } + + final Set<ProcessGroupDTO> rootGroupSet = new HashSet<>(); + flowDto.setProcessGroups(rootGroupSet); + rootGroupSet.add(FlowFromDOMFactory.getProcessGroup(null, (Element) nodeList.item(0), encryptor)); + + return flowDto; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/SnippetManager.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/SnippetManager.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/SnippetManager.java new file mode 100644 index 0000000..415472f --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/SnippetManager.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller; + +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.nifi.io.ByteArrayInputStream; +import org.apache.nifi.io.ByteArrayOutputStream; +import org.apache.nifi.io.DataOutputStream; +import org.apache.nifi.io.StreamUtils; +import org.apache.nifi.persistence.StandardSnippetDeserializer; +import org.apache.nifi.persistence.StandardSnippetSerializer; + +public class SnippetManager { + + private final ConcurrentMap<String, StandardSnippet> snippetMap = new ConcurrentHashMap<>(); + + public void addSnippet(final StandardSnippet snippet) { + final StandardSnippet oldSnippet = this.snippetMap.putIfAbsent(snippet.getId(), snippet); + if (oldSnippet != null) { + throw new IllegalStateException("Snippet with ID " + snippet.getId() + " already exists"); + } + } + + public void removeSnippet(final StandardSnippet snippet) { + if (!snippetMap.remove(snippet.getId(), snippet)) { + throw new IllegalStateException("Snippet is not contained in this SnippetManager"); + } + } + + public StandardSnippet getSnippet(final String identifier) { + return snippetMap.get(identifier); + } + + public Collection<StandardSnippet> getSnippets() { + return snippetMap.values(); + } + + public void clear() { + snippetMap.clear(); + } + + public static List<StandardSnippet> parseBytes(final byte[] bytes) { + final List<StandardSnippet> snippets = new ArrayList<>(); + + try (final InputStream rawIn = new ByteArrayInputStream(bytes); + final DataInputStream in = new DataInputStream(rawIn)) { + final int length = in.readInt(); + final byte[] buffer = new byte[length]; + StreamUtils.fillBuffer(in, buffer, true); + final StandardSnippet snippet = StandardSnippetDeserializer.deserialize(new ByteArrayInputStream(buffer)); + snippets.add(snippet); + } catch (final IOException e) { + throw new RuntimeException("Failed to parse bytes", e); // should never happen because of streams being used + } + + return snippets; + } + + public byte[] export() { + try (final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final DataOutputStream dos = new DataOutputStream(baos)) { + for (final StandardSnippet snippet : getSnippets()) { + final byte[] bytes = StandardSnippetSerializer.serialize(snippet); + dos.writeInt(bytes.length); + dos.write(bytes); + } + + return baos.toByteArray(); + } catch (final IOException e) { + // won't happen + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardCounter.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardCounter.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardCounter.java new file mode 100644 index 0000000..2899a85 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardCounter.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller; + +import org.apache.nifi.controller.Counter; +import java.util.concurrent.atomic.AtomicLong; + +public class StandardCounter implements Counter { + + private final String identifier; + private final String context; + private final String name; + private final AtomicLong value; + + public StandardCounter(final String identifier, final String context, final String name) { + this.identifier = identifier; + this.context = context; + this.name = name; + this.value = new AtomicLong(0L); + } + + public void adjust(final long delta) { + this.value.addAndGet(delta); + } + + public String getName() { + return name; + } + + public long getValue() { + return this.value.get(); + } + + public String getContext() { + return context; + } + + public String getIdentifier() { + return identifier; + } + + public void reset() { + this.value.set(0); + } + + @Override + public String toString() { + return "Counter[identifier=" + identifier + ", context=" + context + ", name=" + name + ", value=" + value + ']'; + } + + public static UnmodifiableCounter unmodifiableCounter(final Counter counter) { + return new UnmodifiableCounter(counter); + } + + static class UnmodifiableCounter extends StandardCounter { + + private final Counter counter; + + public UnmodifiableCounter(final Counter counter) { + super(counter.getIdentifier(), counter.getContext(), counter.getName()); + this.counter = counter; + } + + @Override + public void adjust(long delta) { + throw new UnsupportedOperationException("Cannot modify value of UnmodifiableCounter"); + } + + @Override + public String getName() { + return counter.getName(); + } + + @Override + public long getValue() { + return counter.getValue(); + } + + @Override + public String getContext() { + return counter.getContext(); + } + + @Override + public String getIdentifier() { + return counter.getIdentifier(); + } + + @Override + public String toString() { + return counter.toString(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java new file mode 100644 index 0000000..e08a94d --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java @@ -0,0 +1,404 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller; + +import java.io.BufferedOutputStream; +import java.io.OutputStream; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; +import javax.xml.transform.OutputKeys; +import javax.xml.transform.Transformer; +import javax.xml.transform.TransformerException; +import javax.xml.transform.TransformerFactory; +import javax.xml.transform.TransformerFactoryConfigurationError; +import javax.xml.transform.dom.DOMSource; +import javax.xml.transform.stream.StreamResult; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.connectable.ConnectableType; +import org.apache.nifi.connectable.Connection; +import org.apache.nifi.connectable.Funnel; +import org.apache.nifi.connectable.Port; +import org.apache.nifi.connectable.Position; +import org.apache.nifi.connectable.Size; +import org.apache.nifi.controller.label.Label; +import org.apache.nifi.encrypt.StringEncryptor; +import org.apache.nifi.flowfile.FlowFilePrioritizer; +import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.groups.RemoteProcessGroup; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.remote.RemoteGroupPort; +import org.apache.nifi.remote.RootGroupPort; + +import org.w3c.dom.DOMException; +import org.w3c.dom.Document; +import org.w3c.dom.Element; + +/** + * Serializes a Flow Controller as XML to an output stream. + * + * NOT THREAD-SAFE. + */ +public class StandardFlowSerializer implements FlowSerializer { + + private final StringEncryptor encryptor; + + public StandardFlowSerializer(final StringEncryptor encryptor) { + this.encryptor = encryptor; + } + + @Override + public void serialize(final FlowController controller, final OutputStream os) throws FlowSerializationException { + try { + // create a new, empty document + final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance(); + final DocumentBuilder docBuilder = docFactory.newDocumentBuilder(); + final Document doc = docBuilder.newDocument(); + + // populate document with controller state + final Element rootNode = doc.createElement("flowController"); + doc.appendChild(rootNode); + addTextElement(rootNode, "maxTimerDrivenThreadCount", controller.getMaxTimerDrivenThreadCount()); + addTextElement(rootNode, "maxEventDrivenThreadCount", controller.getMaxEventDrivenThreadCount()); + addProcessGroup(rootNode, controller.getGroup(controller.getRootGroupId()), "rootGroup"); + + final DOMSource domSource = new DOMSource(doc); + final StreamResult streamResult = new StreamResult(new BufferedOutputStream(os)); + + // configure the transformer and convert the DOM + final TransformerFactory transformFactory = TransformerFactory.newInstance(); + final Transformer transformer = transformFactory.newTransformer(); + transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "2"); + transformer.setOutputProperty(OutputKeys.INDENT, "yes"); + + // transform the document to byte stream + transformer.transform(domSource, streamResult); + + } catch (final ParserConfigurationException | DOMException | TransformerFactoryConfigurationError | IllegalArgumentException | TransformerException e) { + throw new FlowSerializationException(e); + } + } + + private void addSize(final Element parentElement, final Size size) { + final Element element = parentElement.getOwnerDocument().createElement("size"); + element.setAttribute("width", String.valueOf(size.getWidth())); + element.setAttribute("height", String.valueOf(size.getHeight())); + parentElement.appendChild(element); + } + + private void addPosition(final Element parentElement, final Position position) { + addPosition(parentElement, position, "position"); + } + + private void addPosition(final Element parentElement, final Position position, final String elementName) { + final Element element = parentElement.getOwnerDocument().createElement(elementName); + element.setAttribute("x", String.valueOf(position.getX())); + element.setAttribute("y", String.valueOf(position.getY())); + parentElement.appendChild(element); + } + + private void addProcessGroup(final Element parentElement, final ProcessGroup group, final String elementName) { + final Document doc = parentElement.getOwnerDocument(); + final Element element = doc.createElement(elementName); + parentElement.appendChild(element); + addTextElement(element, "id", group.getIdentifier()); + addTextElement(element, "name", group.getName()); + addPosition(element, group.getPosition()); + addTextElement(element, "comment", group.getComments()); + + for (final ProcessorNode processor : group.getProcessors()) { + addProcessor(element, processor); + } + + if (group.isRootGroup()) { + for (final Port port : group.getInputPorts()) { + addRootGroupPort(element, (RootGroupPort) port, "inputPort"); + } + + for (final Port port : group.getOutputPorts()) { + addRootGroupPort(element, (RootGroupPort) port, "outputPort"); + } + } else { + for (final Port port : group.getInputPorts()) { + addPort(element, port, "inputPort"); + } + + for (final Port port : group.getOutputPorts()) { + addPort(element, port, "outputPort"); + } + } + + for (final Label label : group.getLabels()) { + addLabel(element, label); + } + + for (final Funnel funnel : group.getFunnels()) { + addFunnel(element, funnel); + } + + for (final ProcessGroup childGroup : group.getProcessGroups()) { + addProcessGroup(element, childGroup, "processGroup"); + } + + for (final RemoteProcessGroup remoteRef : group.getRemoteProcessGroups()) { + addRemoteProcessGroup(element, remoteRef); + } + + for (final Connection connection : group.getConnections()) { + addConnection(element, connection); + } + } + + private void addStyle(final Element parentElement, final Map<String, String> style) { + final Element element = parentElement.getOwnerDocument().createElement("styles"); + + for (final Map.Entry<String, String> entry : style.entrySet()) { + final Element styleElement = parentElement.getOwnerDocument().createElement("style"); + styleElement.setAttribute("name", entry.getKey()); + styleElement.setTextContent(entry.getValue()); + element.appendChild(styleElement); + } + + parentElement.appendChild(element); + } + + private void addLabel(final Element parentElement, final Label label) { + final Document doc = parentElement.getOwnerDocument(); + final Element element = doc.createElement("label"); + parentElement.appendChild(element); + addTextElement(element, "id", label.getIdentifier()); + + addPosition(element, label.getPosition()); + addSize(element, label.getSize()); + addStyle(element, label.getStyle()); + + addTextElement(element, "value", label.getValue()); + parentElement.appendChild(element); + } + + private void addFunnel(final Element parentElement, final Funnel funnel) { + final Document doc = parentElement.getOwnerDocument(); + final Element element = doc.createElement("funnel"); + parentElement.appendChild(element); + addTextElement(element, "id", funnel.getIdentifier()); + addPosition(element, funnel.getPosition()); + } + + private void addRemoteProcessGroup(final Element parentElement, final RemoteProcessGroup remoteRef) { + final Document doc = parentElement.getOwnerDocument(); + final Element element = doc.createElement("remoteProcessGroup"); + parentElement.appendChild(element); + addTextElement(element, "id", remoteRef.getIdentifier()); + addTextElement(element, "name", remoteRef.getName()); + addPosition(element, remoteRef.getPosition()); + addTextElement(element, "comment", remoteRef.getComments()); + addTextElement(element, "url", remoteRef.getTargetUri().toString()); + addTextElement(element, "timeout", remoteRef.getCommunicationsTimeout()); + addTextElement(element, "yieldPeriod", remoteRef.getYieldDuration()); + addTextElement(element, "transmitting", String.valueOf(remoteRef.isTransmitting())); + + for (final RemoteGroupPort port : remoteRef.getInputPorts()) { + if (port.hasIncomingConnection()) { + addRemoteGroupPort(element, port, "inputPort"); + } + } + + for (final RemoteGroupPort port : remoteRef.getOutputPorts()) { + if (!port.getConnections().isEmpty()) { + addRemoteGroupPort(element, port, "outputPort"); + } + } + + parentElement.appendChild(element); + } + + private void addRemoteGroupPort(final Element parentElement, final RemoteGroupPort port, final String elementName) { + final Document doc = parentElement.getOwnerDocument(); + final Element element = doc.createElement(elementName); + parentElement.appendChild(element); + addTextElement(element, "id", port.getIdentifier()); + addTextElement(element, "name", port.getName()); + addPosition(element, port.getPosition()); + addTextElement(element, "comments", port.getComments()); + addTextElement(element, "scheduledState", port.getScheduledState().name()); + addTextElement(element, "maxConcurrentTasks", port.getMaxConcurrentTasks()); + addTextElement(element, "useCompression", String.valueOf(((RemoteGroupPort) port).isUseCompression())); + + parentElement.appendChild(element); + } + + private void addPort(final Element parentElement, final Port port, final String elementName) { + final Document doc = parentElement.getOwnerDocument(); + final Element element = doc.createElement(elementName); + parentElement.appendChild(element); + addTextElement(element, "id", port.getIdentifier()); + addTextElement(element, "name", port.getName()); + addPosition(element, port.getPosition()); + addTextElement(element, "comments", port.getComments()); + addTextElement(element, "scheduledState", port.getScheduledState().name()); + + parentElement.appendChild(element); + } + + private void addRootGroupPort(final Element parentElement, final RootGroupPort port, final String elementName) { + final Document doc = parentElement.getOwnerDocument(); + final Element element = doc.createElement(elementName); + parentElement.appendChild(element); + addTextElement(element, "id", port.getIdentifier()); + addTextElement(element, "name", port.getName()); + addPosition(element, port.getPosition()); + addTextElement(element, "comments", port.getComments()); + addTextElement(element, "scheduledState", port.getScheduledState().name()); + addTextElement(element, "maxConcurrentTasks", String.valueOf(port.getMaxConcurrentTasks())); + for (final String user : port.getUserAccessControl()) { + addTextElement(element, "userAccessControl", user); + } + for (final String group : port.getGroupAccessControl()) { + addTextElement(element, "groupAccessControl", group); + } + + parentElement.appendChild(element); + } + + private void addProcessor(final Element parentElement, final ProcessorNode processor) { + final Document doc = parentElement.getOwnerDocument(); + final Element element = doc.createElement("processor"); + parentElement.appendChild(element); + addTextElement(element, "id", processor.getIdentifier()); + addTextElement(element, "name", processor.getName()); + + addPosition(element, processor.getPosition()); + addStyle(element, processor.getStyle()); + + addTextElement(element, "comment", processor.getComments()); + addTextElement(element, "class", processor.getProcessor().getClass().getCanonicalName()); + addTextElement(element, "maxConcurrentTasks", processor.getMaxConcurrentTasks()); + addTextElement(element, "schedulingPeriod", processor.getSchedulingPeriod()); + addTextElement(element, "penalizationPeriod", processor.getPenalizationPeriod()); + addTextElement(element, "yieldPeriod", processor.getYieldPeriod()); + addTextElement(element, "bulletinLevel", processor.getBulletinLevel().toString()); + addTextElement(element, "lossTolerant", String.valueOf(processor.isLossTolerant())); + addTextElement(element, "scheduledState", processor.getScheduledState().name()); + addTextElement(element, "schedulingStrategy", processor.getSchedulingStrategy().name()); + addTextElement(element, "runDurationNanos", processor.getRunDuration(TimeUnit.NANOSECONDS)); + + // properties. + for (final Map.Entry<PropertyDescriptor, String> entry : processor.getProperties().entrySet()) { + final PropertyDescriptor descriptor = entry.getKey(); + String value = entry.getValue(); + + if (value != null && descriptor.isSensitive()) { + value = ENC_PREFIX + encryptor.encrypt(value) + ENC_SUFFIX; + } + + if (value == null) { + value = descriptor.getDefaultValue(); + } + + final Element propElement = doc.createElement("property"); + addTextElement(propElement, "name", descriptor.getName()); + if (value != null) { + addTextElement(propElement, "value", value); + } + + element.appendChild(propElement); + } + + final String annotationData = processor.getAnnotationData(); + if (annotationData != null) { + addTextElement(element, "annotationData", annotationData); + } + + for (final Relationship rel : processor.getAutoTerminatedRelationships()) { + addTextElement(element, "autoTerminatedRelationship", rel.getName()); + } + } + + private void addConnection(final Element parentElement, final Connection connection) { + final Document doc = parentElement.getOwnerDocument(); + final Element element = doc.createElement("connection"); + parentElement.appendChild(element); + addTextElement(element, "id", connection.getIdentifier()); + addTextElement(element, "name", connection.getName()); + + final Element bendPointsElement = doc.createElement("bendPoints"); + element.appendChild(bendPointsElement); + for (final Position bendPoint : connection.getBendPoints()) { + addPosition(bendPointsElement, bendPoint, "bendPoint"); + } + + addTextElement(element, "labelIndex", connection.getLabelIndex()); + addTextElement(element, "zIndex", connection.getZIndex()); + + final String sourceId = connection.getSource().getIdentifier(); + final ConnectableType sourceType = connection.getSource().getConnectableType(); + final String sourceGroupId; + if (sourceType == ConnectableType.REMOTE_OUTPUT_PORT) { + sourceGroupId = ((RemoteGroupPort) connection.getSource()).getRemoteProcessGroup().getIdentifier(); + } else { + sourceGroupId = connection.getSource().getProcessGroup().getIdentifier(); + } + + final ConnectableType destinationType = connection.getDestination().getConnectableType(); + final String destinationId = connection.getDestination().getIdentifier(); + final String destinationGroupId; + if (destinationType == ConnectableType.REMOTE_INPUT_PORT) { + destinationGroupId = ((RemoteGroupPort) connection.getDestination()).getRemoteProcessGroup().getIdentifier(); + } else { + destinationGroupId = connection.getDestination().getProcessGroup().getIdentifier(); + } + + addTextElement(element, "sourceId", sourceId); + addTextElement(element, "sourceGroupId", sourceGroupId); + addTextElement(element, "sourceType", sourceType.toString()); + + addTextElement(element, "destinationId", destinationId); + addTextElement(element, "destinationGroupId", destinationGroupId); + addTextElement(element, "destinationType", destinationType.toString()); + + for (final Relationship relationship : connection.getRelationships()) { + addTextElement(element, "relationship", relationship.getName()); + } + + addTextElement(element, "maxWorkQueueSize", connection.getFlowFileQueue().getBackPressureObjectThreshold()); + addTextElement(element, "maxWorkQueueDataSize", connection.getFlowFileQueue().getBackPressureDataSizeThreshold()); + + addTextElement(element, "flowFileExpiration", connection.getFlowFileQueue().getFlowFileExpiration()); + for (final FlowFilePrioritizer comparator : connection.getFlowFileQueue().getPriorities()) { + final String className = comparator.getClass().getCanonicalName(); + addTextElement(element, "queuePrioritizerClass", className); + } + + parentElement.appendChild(element); + } + + private void addTextElement(final Element element, final String name, final long value) { + addTextElement(element, name, String.valueOf(value)); + } + + private void addTextElement(final Element element, final String name, final String value) { + final Document doc = element.getOwnerDocument(); + final Element toAdd = doc.createElement(name); + toAdd.setTextContent(value); + element.appendChild(toAdd); + } + +}