http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowFromDOMFactory.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowFromDOMFactory.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowFromDOMFactory.java deleted file mode 100644 index c67181a..0000000 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowFromDOMFactory.java +++ /dev/null @@ -1,418 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; - -import org.apache.nifi.connectable.Size; -import org.apache.nifi.encrypt.StringEncryptor; -import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor; -import org.apache.nifi.remote.StandardRemoteProcessGroupPortDescriptor; -import org.apache.nifi.scheduling.SchedulingStrategy; -import org.apache.nifi.util.DomUtils; -import org.apache.nifi.web.api.dto.ConnectableDTO; -import org.apache.nifi.web.api.dto.ConnectionDTO; -import org.apache.nifi.web.api.dto.FlowSnippetDTO; -import org.apache.nifi.web.api.dto.FunnelDTO; -import org.apache.nifi.web.api.dto.LabelDTO; -import org.apache.nifi.web.api.dto.PortDTO; -import org.apache.nifi.web.api.dto.PositionDTO; -import org.apache.nifi.web.api.dto.ProcessGroupDTO; -import org.apache.nifi.web.api.dto.ProcessorConfigDTO; -import org.apache.nifi.web.api.dto.ProcessorDTO; -import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; - -import org.w3c.dom.Element; -import org.w3c.dom.NodeList; - -public class FlowFromDOMFactory { - - public static PositionDTO getPosition(final Element positionElement) { - if (positionElement == null) { - throw new IllegalArgumentException("Invalid Flow: Found no 'position' element"); - } - return new PositionDTO(Double.parseDouble(positionElement.getAttribute("x")), Double.parseDouble(positionElement.getAttribute("y"))); - } - - public static Size getSize(final Element sizeElement) { - if (sizeElement == null) { - throw new IllegalArgumentException("Invalid Flow: Found no 'size' element"); - } - - return new Size(Double.parseDouble(sizeElement.getAttribute("width")), Double.parseDouble(sizeElement.getAttribute("height"))); - } - - public static Map<String, String> getStyle(final Element stylesElement) { - final Map<String, String> styles = new HashMap<>(); - if (stylesElement == null) { - return styles; - } - - for (final Element styleElement : getChildrenByTagName(stylesElement, "style")) { - final String styleName = styleElement.getAttribute("name"); - final String styleValue = styleElement.getTextContent(); - styles.put(styleName, styleValue); - } - - return styles; - } - - public static ProcessGroupDTO getProcessGroup(final String parentId, final Element element, final StringEncryptor encryptor) { - final ProcessGroupDTO dto = new ProcessGroupDTO(); - final String groupId = getString(element, "id"); - dto.setId(groupId); - dto.setParentGroupId(parentId); - dto.setName(getString(element, "name")); - dto.setPosition(getPosition(DomUtils.getChild(element, "position"))); - dto.setComments(getString(element, "comment")); - - final Set<ProcessorDTO> processors = new HashSet<>(); - final Set<ConnectionDTO> connections = new HashSet<>(); - final Set<FunnelDTO> funnels = new HashSet<>(); - final Set<PortDTO> inputPorts = new HashSet<>(); - final Set<PortDTO> outputPorts = new HashSet<>(); - final Set<LabelDTO> labels = new HashSet<>(); - final Set<ProcessGroupDTO> processGroups = new HashSet<>(); - final Set<RemoteProcessGroupDTO> remoteProcessGroups = new HashSet<>(); - - final FlowSnippetDTO groupContents = new FlowSnippetDTO(); - groupContents.setConnections(connections); - groupContents.setFunnels(funnels); - groupContents.setInputPorts(inputPorts); - groupContents.setLabels(labels); - groupContents.setOutputPorts(outputPorts); - groupContents.setProcessGroups(processGroups); - groupContents.setProcessors(processors); - groupContents.setRemoteProcessGroups(remoteProcessGroups); - - NodeList nodeList = DomUtils.getChildNodesByTagName(element, "processor"); - for (int i = 0; i < nodeList.getLength(); i++) { - processors.add(getProcessor((Element) nodeList.item(i), encryptor)); - } - - nodeList = DomUtils.getChildNodesByTagName(element, "funnel"); - for (int i = 0; i < nodeList.getLength(); i++) { - funnels.add(getFunnel((Element) nodeList.item(i))); - } - - nodeList = DomUtils.getChildNodesByTagName(element, "inputPort"); - for (int i = 0; i < nodeList.getLength(); i++) { - inputPorts.add(getPort((Element) nodeList.item(i))); - } - - nodeList = DomUtils.getChildNodesByTagName(element, "outputPort"); - for (int i = 0; i < nodeList.getLength(); i++) { - outputPorts.add(getPort((Element) nodeList.item(i))); - } - - nodeList = DomUtils.getChildNodesByTagName(element, "label"); - for (int i = 0; i < nodeList.getLength(); i++) { - labels.add(getLabel((Element) nodeList.item(i))); - } - - nodeList = DomUtils.getChildNodesByTagName(element, "processGroup"); - for (int i = 0; i < nodeList.getLength(); i++) { - processGroups.add(getProcessGroup(groupId, (Element) nodeList.item(i), encryptor)); - } - - nodeList = DomUtils.getChildNodesByTagName(element, "remoteProcessGroup"); - for (int i = 0; i < nodeList.getLength(); i++) { - remoteProcessGroups.add(getRemoteProcessGroup((Element) nodeList.item(i))); - } - - nodeList = DomUtils.getChildNodesByTagName(element, "connection"); - for (int i = 0; i < nodeList.getLength(); i++) { - connections.add(getConnection((Element) nodeList.item(i))); - } - - dto.setContents(groupContents); - return dto; - } - - public static ConnectionDTO getConnection(final Element element) { - final ConnectionDTO dto = new ConnectionDTO(); - dto.setId(getString(element, "id")); - dto.setName(getString(element, "name")); - dto.setLabelIndex(getOptionalInt(element, "labelIndex")); - dto.setzIndex(getOptionalLong(element, "zIndex")); - - final List<PositionDTO> bends = new ArrayList<>(); - final Element bendPointsElement = DomUtils.getChild(element, "bendPoints"); - if (bendPointsElement != null) { - for (final Element bendPointElement : getChildrenByTagName(bendPointsElement, "bendPoint")) { - final PositionDTO bend = getPosition(bendPointElement); - bends.add(bend); - } - } - dto.setBends(bends); - - final ConnectableDTO sourceConnectable = new ConnectableDTO(); - dto.setSource(sourceConnectable); - sourceConnectable.setId(getString(element, "sourceId")); - sourceConnectable.setGroupId(getString(element, "sourceGroupId")); - sourceConnectable.setType(getString(element, "sourceType")); - - final ConnectableDTO destConnectable = new ConnectableDTO(); - dto.setDestination(destConnectable); - destConnectable.setId(getString(element, "destinationId")); - destConnectable.setGroupId(getString(element, "destinationGroupId")); - destConnectable.setType(getString(element, "destinationType")); - - final Set<String> relationships = new HashSet<>(); - final List<Element> relationshipNodeList = getChildrenByTagName(element, "relationship"); - for (final Element relationshipElem : relationshipNodeList) { - relationships.add(relationshipElem.getTextContent()); - } - dto.setSelectedRelationships(relationships); - - dto.setBackPressureObjectThreshold(getLong(element, "maxWorkQueueSize")); - - final String maxDataSize = getString(element, "maxWorkQueueDataSize"); - if (maxDataSize != null && !maxDataSize.trim().isEmpty()) { - dto.setBackPressureDataSizeThreshold(maxDataSize); - } - - String expiration = getString(element, "flowFileExpiration"); - if (expiration == null) { - expiration = "0 sec"; - } - dto.setFlowFileExpiration(expiration); - - final List<String> prioritizerClasses = new ArrayList<>(); - final List<Element> prioritizerNodeList = getChildrenByTagName(element, "queuePrioritizerClass"); - for (final Element prioritizerElement : prioritizerNodeList) { - prioritizerClasses.add(prioritizerElement.getTextContent().trim()); - } - dto.setPrioritizers(prioritizerClasses); - - return dto; - } - - public static RemoteProcessGroupDTO getRemoteProcessGroup(final Element element) { - final RemoteProcessGroupDTO dto = new RemoteProcessGroupDTO(); - dto.setId(getString(element, "id")); - dto.setName(getString(element, "name")); - dto.setTargetUri(getString(element, "url")); - dto.setTransmitting(getBoolean(element, "transmitting")); - dto.setPosition(getPosition(DomUtils.getChild(element, "position"))); - dto.setCommunicationsTimeout(getString(element, "timeout")); - dto.setComments(getString(element, "comment")); - - return dto; - } - - public static LabelDTO getLabel(final Element element) { - final LabelDTO dto = new LabelDTO(); - dto.setId(getString(element, "id")); - dto.setLabel(getString(element, "value")); - dto.setPosition(getPosition(DomUtils.getChild(element, "position"))); - final Size size = getSize(DomUtils.getChild(element, "size")); - dto.setWidth(size.getWidth()); - dto.setHeight(size.getHeight()); - dto.setStyle(getStyle(DomUtils.getChild(element, "styles"))); - - return dto; - } - - public static FunnelDTO getFunnel(final Element element) { - final FunnelDTO dto = new FunnelDTO(); - dto.setId(getString(element, "id")); - dto.setPosition(getPosition(DomUtils.getChild(element, "position"))); - - return dto; - } - - public static PortDTO getPort(final Element element) { - final PortDTO portDTO = new PortDTO(); - portDTO.setId(getString(element, "id")); - portDTO.setPosition(getPosition(DomUtils.getChild(element, "position"))); - portDTO.setName(getString(element, "name")); - portDTO.setComments(getString(element, "comments")); - final ScheduledState scheduledState = getScheduledState(element); - portDTO.setState(scheduledState.toString()); - - final List<Element> maxTasksElements = getChildrenByTagName(element, "maxConcurrentTasks"); - if (!maxTasksElements.isEmpty()) { - portDTO.setConcurrentlySchedulableTaskCount(Integer.parseInt(maxTasksElements.get(0).getTextContent())); - } - - final List<Element> userAccessControls = getChildrenByTagName(element, "userAccessControl"); - if (userAccessControls != null && !userAccessControls.isEmpty()) { - final Set<String> users = new HashSet<>(); - portDTO.setUserAccessControl(users); - for (final Element userElement : userAccessControls) { - users.add(userElement.getTextContent()); - } - } - - final List<Element> groupAccessControls = getChildrenByTagName(element, "groupAccessControl"); - if (groupAccessControls != null && !groupAccessControls.isEmpty()) { - final Set<String> groups = new HashSet<>(); - portDTO.setGroupAccessControl(groups); - for (final Element groupElement : groupAccessControls) { - groups.add(groupElement.getTextContent()); - } - } - - return portDTO; - } - - public static RemoteProcessGroupPortDescriptor getRemoteProcessGroupPort(final Element element) { - final StandardRemoteProcessGroupPortDescriptor descriptor = new StandardRemoteProcessGroupPortDescriptor(); - - // What we have serialized is the ID of the Remote Process Group, followed by a dash ('-'), followed by - // the actual ID of the port; we want to get rid of the remote process group id. - String id = getString(element, "id"); - if (id.length() > 37) { - id = id.substring(37); - } - - descriptor.setId(id); - descriptor.setName(getString(element, "name")); - descriptor.setComments(getString(element, "comments")); - descriptor.setConcurrentlySchedulableTaskCount(getInt(element, "maxConcurrentTasks")); - descriptor.setUseCompression(getBoolean(element, "useCompression")); - descriptor.setTransmitting("RUNNING".equalsIgnoreCase(getString(element, "scheduledState"))); - - return descriptor; - } - - public static ProcessorDTO getProcessor(final Element element, final StringEncryptor encryptor) { - final ProcessorDTO dto = new ProcessorDTO(); - - dto.setId(getString(element, "id")); - dto.setName(getString(element, "name")); - dto.setType(getString(element, "class")); - dto.setPosition(getPosition(DomUtils.getChild(element, "position"))); - dto.setStyle(getStyle(DomUtils.getChild(element, "styles"))); - - final ProcessorConfigDTO configDto = new ProcessorConfigDTO(); - dto.setConfig(configDto); - configDto.setComments(getString(element, "comment")); - configDto.setAnnotationData(getString(element, "annotationData")); - configDto.setConcurrentlySchedulableTaskCount(getInt(element, "maxConcurrentTasks")); - final String schedulingPeriod = getString(element, "schedulingPeriod"); - configDto.setSchedulingPeriod(schedulingPeriod); - configDto.setPenaltyDuration(getString(element, "penalizationPeriod")); - configDto.setYieldDuration(getString(element, "yieldPeriod")); - configDto.setBulletinLevel(getString(element, "bulletinLevel")); - configDto.setLossTolerant(getBoolean(element, "lossTolerant")); - final ScheduledState scheduledState = getScheduledState(element); - dto.setState(scheduledState.toString()); - - // handle scheduling strategy - final String schedulingStrategyName = getString(element, "schedulingStrategy"); - if (schedulingStrategyName == null || schedulingStrategyName.trim().isEmpty()) { - configDto.setSchedulingStrategy(SchedulingStrategy.TIMER_DRIVEN.name()); - } else { - configDto.setSchedulingStrategy(schedulingStrategyName.trim()); - } - - final Long runDurationNanos = getOptionalLong(element, "runDurationNanos"); - if (runDurationNanos != null) { - configDto.setRunDurationMillis(TimeUnit.NANOSECONDS.toMillis(runDurationNanos)); - } - - final LinkedHashMap<String, String> properties = new LinkedHashMap<>(); - final List<Element> propertyNodeList = getChildrenByTagName(element, "property"); - for (final Element propertyElement : propertyNodeList) { - final String name = getString(propertyElement, "name"); - final String value = decrypt(getString(propertyElement, "value"), encryptor); - properties.put(name, value); - } - configDto.setProperties(properties); - - final Set<String> autoTerminatedRelationships = new HashSet<>(); - final List<Element> autoTerminateList = getChildrenByTagName(element, "autoTerminatedRelationship"); - for (final Element autoTerminateElement : autoTerminateList) { - autoTerminatedRelationships.add(autoTerminateElement.getTextContent()); - } - configDto.setAutoTerminatedRelationships(autoTerminatedRelationships); - - return dto; - } - - private static String getString(final Element element, final String childElementName) { - final List<Element> nodeList = getChildrenByTagName(element, childElementName); - if (nodeList == null || nodeList.isEmpty()) { - return null; - } - final Element childElement = nodeList.get(0); - return childElement.getTextContent(); - } - - private static Integer getOptionalInt(final Element element, final String childElementName) { - final List<Element> nodeList = getChildrenByTagName(element, childElementName); - if (nodeList == null || nodeList.isEmpty()) { - return null; - } - final Element childElement = nodeList.get(0); - final String val = childElement.getTextContent(); - if (val == null) { - return null; - } - return Integer.parseInt(val); - } - - private static Long getOptionalLong(final Element element, final String childElementName) { - final List<Element> nodeList = getChildrenByTagName(element, childElementName); - if (nodeList == null || nodeList.isEmpty()) { - return null; - } - final Element childElement = nodeList.get(0); - final String val = childElement.getTextContent(); - if (val == null) { - return null; - } - return Long.parseLong(val); - } - - private static int getInt(final Element element, final String childElementName) { - return Integer.parseInt(getString(element, childElementName)); - } - - private static long getLong(final Element element, final String childElementName) { - return Long.parseLong(getString(element, childElementName)); - } - - private static boolean getBoolean(final Element element, final String childElementName) { - return Boolean.parseBoolean(getString(element, childElementName)); - } - - private static ScheduledState getScheduledState(final Element element) { - return ScheduledState.valueOf(getString(element, "scheduledState")); - } - - private static List<Element> getChildrenByTagName(final Element element, final String childElementName) { - return DomUtils.getChildElementsByTagName(element, childElementName); - } - - private static String decrypt(final String value, final StringEncryptor encryptor) { - if (value != null && value.startsWith(FlowSerializer.ENC_PREFIX) && value.endsWith(FlowSerializer.ENC_SUFFIX)) { - return encryptor.decrypt(value.substring(FlowSerializer.ENC_PREFIX.length(), value.length() - FlowSerializer.ENC_SUFFIX.length())); - } else { - return value; - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSerializationException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSerializationException.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSerializationException.java deleted file mode 100644 index f1ee760..0000000 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSerializationException.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller; - -/** - * Represents the exceptional case when flow configuration is malformed and - * therefore, cannot be serialized or deserialized. - * - * @author unattributed - */ -public class FlowSerializationException extends RuntimeException { - - private static final long serialVersionUID = 128934798237L; - - public FlowSerializationException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { - super(message, cause, enableSuppression, writableStackTrace); - } - - public FlowSerializationException(Throwable cause) { - super(cause); - } - - public FlowSerializationException(String message, Throwable cause) { - super(message, cause); - } - - public FlowSerializationException(String message) { - super(message); - } - - public FlowSerializationException() { - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSerializer.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSerializer.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSerializer.java deleted file mode 100644 index 331b26c..0000000 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSerializer.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller; - -import java.io.OutputStream; - -/** - * Serializes the flow configuration of a controller instance to an output - * stream. - * - * @author unattributed - */ -public interface FlowSerializer { - - public static final String ENC_PREFIX = "enc{"; - public static final String ENC_SUFFIX = "}"; - - /** - * Serializes the flow configuration of a controller instance. - * - * @param controller a controller - * @param os an output stream to write the configuration to - * - * @throws FlowSerializationException if serialization failed - */ - void serialize(FlowController controller, OutputStream os) throws FlowSerializationException; - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSynchronizationException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSynchronizationException.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSynchronizationException.java deleted file mode 100644 index 706ac46..0000000 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSynchronizationException.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller; - -/** - * Represents the exceptional case when a controller managing an existing flow - * fails to fully load a different flow. - * - * @author unattributed - */ -public class FlowSynchronizationException extends RuntimeException { - - private static final long serialVersionUID = 109234802938L; - - public FlowSynchronizationException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { - super(message, cause, enableSuppression, writableStackTrace); - } - - public FlowSynchronizationException(Throwable cause) { - super(cause); - } - - public FlowSynchronizationException(String message, Throwable cause) { - super(message, cause); - } - - public FlowSynchronizationException(String message) { - super(message); - } - - public FlowSynchronizationException() { - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSynchronizer.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSynchronizer.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSynchronizer.java deleted file mode 100644 index f6889fe..0000000 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSynchronizer.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller; - -import org.apache.nifi.cluster.protocol.DataFlow; -import org.apache.nifi.encrypt.StringEncryptor; - -/** - * @author unattributed - */ -public interface FlowSynchronizer { - - /** - * Synchronizes the given controller with the given flow configuration. If - * loading the proposed flow configuration would cause the controller to - * orphan flow files, then an UninheritableFlowException is thrown. - * - * If the FlowSynchronizationException is thrown, then the controller may - * have changed some of its state and should no longer be used. - * - * @param controller the flow controller - * @param dataFlow the flow to load the controller with. If the flow is null - * or zero length, then the controller must not have a flow or else an - * UninheritableFlowException will be thrown. - * @param encryptor used for the encryption/decryption of sensitive property - * values - * - * @throws FlowSerializationException if proposed flow is not a valid flow - * configuration file - * @throws UninheritableFlowException if the proposed flow cannot be loaded - * by the controller because in doing so would risk orphaning flow files - * @throws FlowSynchronizationException if updates to the controller failed. - * If this exception is thrown, then the controller should be considered - * unsafe to be used - */ - void sync(FlowController controller, DataFlow dataFlow, StringEncryptor encryptor) - throws FlowSerializationException, UninheritableFlowException, FlowSynchronizationException; - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowUnmarshaller.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowUnmarshaller.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowUnmarshaller.java deleted file mode 100644 index 42d7f1c..0000000 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowUnmarshaller.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller; - -import java.io.IOException; -import java.util.HashSet; -import java.util.Objects; -import java.util.Set; - -import javax.xml.parsers.DocumentBuilder; -import javax.xml.parsers.DocumentBuilderFactory; -import javax.xml.parsers.ParserConfigurationException; - -import org.apache.nifi.encrypt.StringEncryptor; -import org.apache.nifi.stream.io.ByteArrayInputStream; -import org.apache.nifi.web.api.dto.FlowSnippetDTO; -import org.apache.nifi.web.api.dto.ProcessGroupDTO; - -import org.w3c.dom.Document; -import org.w3c.dom.Element; -import org.w3c.dom.NodeList; -import org.xml.sax.SAXException; - -public class FlowUnmarshaller { - - /** - * Interprets the given byte array as an XML document that conforms to the - * Flow Configuration schema and returns a FlowSnippetDTO representing the - * flow - * - * @param flowContents - * @param encryptor - * @return - * @throws NullPointerException if <code>flowContents</code> is null - * @throws IOException - * @throws SAXException - * @throws ParserConfigurationException - */ - public static FlowSnippetDTO unmarshal(final byte[] flowContents, final StringEncryptor encryptor) throws IOException, SAXException, ParserConfigurationException { - if (Objects.requireNonNull(flowContents).length == 0) { - return new FlowSnippetDTO(); - } - - final DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance(); - final DocumentBuilder docBuilder = dbf.newDocumentBuilder(); - - final Document document = docBuilder.parse(new ByteArrayInputStream(flowContents)); - final FlowSnippetDTO flowDto = new FlowSnippetDTO(); - - final NodeList nodeList = document.getElementsByTagName("rootGroup"); - if (nodeList.getLength() == 0) { - return flowDto; - } - if (nodeList.getLength() > 1) { - throw new IllegalArgumentException("Contents contain multiple rootGroup elements"); - } - - final Set<ProcessGroupDTO> rootGroupSet = new HashSet<>(); - flowDto.setProcessGroups(rootGroupSet); - rootGroupSet.add(FlowFromDOMFactory.getProcessGroup(null, (Element) nodeList.item(0), encryptor)); - - return flowDto; - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/SnippetManager.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/SnippetManager.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/SnippetManager.java deleted file mode 100644 index 3a9662e..0000000 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/SnippetManager.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller; - -import java.io.DataInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import org.apache.nifi.stream.io.ByteArrayInputStream; -import org.apache.nifi.stream.io.ByteArrayOutputStream; -import org.apache.nifi.stream.io.DataOutputStream; -import org.apache.nifi.stream.io.StreamUtils; -import org.apache.nifi.persistence.StandardSnippetDeserializer; -import org.apache.nifi.persistence.StandardSnippetSerializer; - -public class SnippetManager { - - private final ConcurrentMap<String, StandardSnippet> snippetMap = new ConcurrentHashMap<>(); - - public void addSnippet(final StandardSnippet snippet) { - final StandardSnippet oldSnippet = this.snippetMap.putIfAbsent(snippet.getId(), snippet); - if (oldSnippet != null) { - throw new IllegalStateException("Snippet with ID " + snippet.getId() + " already exists"); - } - } - - public void removeSnippet(final StandardSnippet snippet) { - if (!snippetMap.remove(snippet.getId(), snippet)) { - throw new IllegalStateException("Snippet is not contained in this SnippetManager"); - } - } - - public StandardSnippet getSnippet(final String identifier) { - return snippetMap.get(identifier); - } - - public Collection<StandardSnippet> getSnippets() { - return snippetMap.values(); - } - - public void clear() { - snippetMap.clear(); - } - - public static List<StandardSnippet> parseBytes(final byte[] bytes) { - final List<StandardSnippet> snippets = new ArrayList<>(); - - try (final InputStream rawIn = new ByteArrayInputStream(bytes); - final DataInputStream in = new DataInputStream(rawIn)) { - final int length = in.readInt(); - final byte[] buffer = new byte[length]; - StreamUtils.fillBuffer(in, buffer, true); - final StandardSnippet snippet = StandardSnippetDeserializer.deserialize(new ByteArrayInputStream(buffer)); - snippets.add(snippet); - } catch (final IOException e) { - throw new RuntimeException("Failed to parse bytes", e); // should never happen because of streams being used - } - - return snippets; - } - - public byte[] export() { - try (final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - final DataOutputStream dos = new DataOutputStream(baos)) { - for (final StandardSnippet snippet : getSnippets()) { - final byte[] bytes = StandardSnippetSerializer.serialize(snippet); - dos.writeInt(bytes.length); - dos.write(bytes); - } - - return baos.toByteArray(); - } catch (final IOException e) { - // won't happen - return null; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardCounter.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardCounter.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardCounter.java deleted file mode 100644 index 2899a85..0000000 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardCounter.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller; - -import org.apache.nifi.controller.Counter; -import java.util.concurrent.atomic.AtomicLong; - -public class StandardCounter implements Counter { - - private final String identifier; - private final String context; - private final String name; - private final AtomicLong value; - - public StandardCounter(final String identifier, final String context, final String name) { - this.identifier = identifier; - this.context = context; - this.name = name; - this.value = new AtomicLong(0L); - } - - public void adjust(final long delta) { - this.value.addAndGet(delta); - } - - public String getName() { - return name; - } - - public long getValue() { - return this.value.get(); - } - - public String getContext() { - return context; - } - - public String getIdentifier() { - return identifier; - } - - public void reset() { - this.value.set(0); - } - - @Override - public String toString() { - return "Counter[identifier=" + identifier + ", context=" + context + ", name=" + name + ", value=" + value + ']'; - } - - public static UnmodifiableCounter unmodifiableCounter(final Counter counter) { - return new UnmodifiableCounter(counter); - } - - static class UnmodifiableCounter extends StandardCounter { - - private final Counter counter; - - public UnmodifiableCounter(final Counter counter) { - super(counter.getIdentifier(), counter.getContext(), counter.getName()); - this.counter = counter; - } - - @Override - public void adjust(long delta) { - throw new UnsupportedOperationException("Cannot modify value of UnmodifiableCounter"); - } - - @Override - public String getName() { - return counter.getName(); - } - - @Override - public long getValue() { - return counter.getValue(); - } - - @Override - public String getContext() { - return counter.getContext(); - } - - @Override - public String getIdentifier() { - return counter.getIdentifier(); - } - - @Override - public String toString() { - return counter.toString(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java deleted file mode 100644 index e08a94d..0000000 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java +++ /dev/null @@ -1,404 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller; - -import java.io.BufferedOutputStream; -import java.io.OutputStream; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import javax.xml.parsers.DocumentBuilder; -import javax.xml.parsers.DocumentBuilderFactory; -import javax.xml.parsers.ParserConfigurationException; -import javax.xml.transform.OutputKeys; -import javax.xml.transform.Transformer; -import javax.xml.transform.TransformerException; -import javax.xml.transform.TransformerFactory; -import javax.xml.transform.TransformerFactoryConfigurationError; -import javax.xml.transform.dom.DOMSource; -import javax.xml.transform.stream.StreamResult; - -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.connectable.ConnectableType; -import org.apache.nifi.connectable.Connection; -import org.apache.nifi.connectable.Funnel; -import org.apache.nifi.connectable.Port; -import org.apache.nifi.connectable.Position; -import org.apache.nifi.connectable.Size; -import org.apache.nifi.controller.label.Label; -import org.apache.nifi.encrypt.StringEncryptor; -import org.apache.nifi.flowfile.FlowFilePrioritizer; -import org.apache.nifi.groups.ProcessGroup; -import org.apache.nifi.groups.RemoteProcessGroup; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.remote.RemoteGroupPort; -import org.apache.nifi.remote.RootGroupPort; - -import org.w3c.dom.DOMException; -import org.w3c.dom.Document; -import org.w3c.dom.Element; - -/** - * Serializes a Flow Controller as XML to an output stream. - * - * NOT THREAD-SAFE. - */ -public class StandardFlowSerializer implements FlowSerializer { - - private final StringEncryptor encryptor; - - public StandardFlowSerializer(final StringEncryptor encryptor) { - this.encryptor = encryptor; - } - - @Override - public void serialize(final FlowController controller, final OutputStream os) throws FlowSerializationException { - try { - // create a new, empty document - final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance(); - final DocumentBuilder docBuilder = docFactory.newDocumentBuilder(); - final Document doc = docBuilder.newDocument(); - - // populate document with controller state - final Element rootNode = doc.createElement("flowController"); - doc.appendChild(rootNode); - addTextElement(rootNode, "maxTimerDrivenThreadCount", controller.getMaxTimerDrivenThreadCount()); - addTextElement(rootNode, "maxEventDrivenThreadCount", controller.getMaxEventDrivenThreadCount()); - addProcessGroup(rootNode, controller.getGroup(controller.getRootGroupId()), "rootGroup"); - - final DOMSource domSource = new DOMSource(doc); - final StreamResult streamResult = new StreamResult(new BufferedOutputStream(os)); - - // configure the transformer and convert the DOM - final TransformerFactory transformFactory = TransformerFactory.newInstance(); - final Transformer transformer = transformFactory.newTransformer(); - transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "2"); - transformer.setOutputProperty(OutputKeys.INDENT, "yes"); - - // transform the document to byte stream - transformer.transform(domSource, streamResult); - - } catch (final ParserConfigurationException | DOMException | TransformerFactoryConfigurationError | IllegalArgumentException | TransformerException e) { - throw new FlowSerializationException(e); - } - } - - private void addSize(final Element parentElement, final Size size) { - final Element element = parentElement.getOwnerDocument().createElement("size"); - element.setAttribute("width", String.valueOf(size.getWidth())); - element.setAttribute("height", String.valueOf(size.getHeight())); - parentElement.appendChild(element); - } - - private void addPosition(final Element parentElement, final Position position) { - addPosition(parentElement, position, "position"); - } - - private void addPosition(final Element parentElement, final Position position, final String elementName) { - final Element element = parentElement.getOwnerDocument().createElement(elementName); - element.setAttribute("x", String.valueOf(position.getX())); - element.setAttribute("y", String.valueOf(position.getY())); - parentElement.appendChild(element); - } - - private void addProcessGroup(final Element parentElement, final ProcessGroup group, final String elementName) { - final Document doc = parentElement.getOwnerDocument(); - final Element element = doc.createElement(elementName); - parentElement.appendChild(element); - addTextElement(element, "id", group.getIdentifier()); - addTextElement(element, "name", group.getName()); - addPosition(element, group.getPosition()); - addTextElement(element, "comment", group.getComments()); - - for (final ProcessorNode processor : group.getProcessors()) { - addProcessor(element, processor); - } - - if (group.isRootGroup()) { - for (final Port port : group.getInputPorts()) { - addRootGroupPort(element, (RootGroupPort) port, "inputPort"); - } - - for (final Port port : group.getOutputPorts()) { - addRootGroupPort(element, (RootGroupPort) port, "outputPort"); - } - } else { - for (final Port port : group.getInputPorts()) { - addPort(element, port, "inputPort"); - } - - for (final Port port : group.getOutputPorts()) { - addPort(element, port, "outputPort"); - } - } - - for (final Label label : group.getLabels()) { - addLabel(element, label); - } - - for (final Funnel funnel : group.getFunnels()) { - addFunnel(element, funnel); - } - - for (final ProcessGroup childGroup : group.getProcessGroups()) { - addProcessGroup(element, childGroup, "processGroup"); - } - - for (final RemoteProcessGroup remoteRef : group.getRemoteProcessGroups()) { - addRemoteProcessGroup(element, remoteRef); - } - - for (final Connection connection : group.getConnections()) { - addConnection(element, connection); - } - } - - private void addStyle(final Element parentElement, final Map<String, String> style) { - final Element element = parentElement.getOwnerDocument().createElement("styles"); - - for (final Map.Entry<String, String> entry : style.entrySet()) { - final Element styleElement = parentElement.getOwnerDocument().createElement("style"); - styleElement.setAttribute("name", entry.getKey()); - styleElement.setTextContent(entry.getValue()); - element.appendChild(styleElement); - } - - parentElement.appendChild(element); - } - - private void addLabel(final Element parentElement, final Label label) { - final Document doc = parentElement.getOwnerDocument(); - final Element element = doc.createElement("label"); - parentElement.appendChild(element); - addTextElement(element, "id", label.getIdentifier()); - - addPosition(element, label.getPosition()); - addSize(element, label.getSize()); - addStyle(element, label.getStyle()); - - addTextElement(element, "value", label.getValue()); - parentElement.appendChild(element); - } - - private void addFunnel(final Element parentElement, final Funnel funnel) { - final Document doc = parentElement.getOwnerDocument(); - final Element element = doc.createElement("funnel"); - parentElement.appendChild(element); - addTextElement(element, "id", funnel.getIdentifier()); - addPosition(element, funnel.getPosition()); - } - - private void addRemoteProcessGroup(final Element parentElement, final RemoteProcessGroup remoteRef) { - final Document doc = parentElement.getOwnerDocument(); - final Element element = doc.createElement("remoteProcessGroup"); - parentElement.appendChild(element); - addTextElement(element, "id", remoteRef.getIdentifier()); - addTextElement(element, "name", remoteRef.getName()); - addPosition(element, remoteRef.getPosition()); - addTextElement(element, "comment", remoteRef.getComments()); - addTextElement(element, "url", remoteRef.getTargetUri().toString()); - addTextElement(element, "timeout", remoteRef.getCommunicationsTimeout()); - addTextElement(element, "yieldPeriod", remoteRef.getYieldDuration()); - addTextElement(element, "transmitting", String.valueOf(remoteRef.isTransmitting())); - - for (final RemoteGroupPort port : remoteRef.getInputPorts()) { - if (port.hasIncomingConnection()) { - addRemoteGroupPort(element, port, "inputPort"); - } - } - - for (final RemoteGroupPort port : remoteRef.getOutputPorts()) { - if (!port.getConnections().isEmpty()) { - addRemoteGroupPort(element, port, "outputPort"); - } - } - - parentElement.appendChild(element); - } - - private void addRemoteGroupPort(final Element parentElement, final RemoteGroupPort port, final String elementName) { - final Document doc = parentElement.getOwnerDocument(); - final Element element = doc.createElement(elementName); - parentElement.appendChild(element); - addTextElement(element, "id", port.getIdentifier()); - addTextElement(element, "name", port.getName()); - addPosition(element, port.getPosition()); - addTextElement(element, "comments", port.getComments()); - addTextElement(element, "scheduledState", port.getScheduledState().name()); - addTextElement(element, "maxConcurrentTasks", port.getMaxConcurrentTasks()); - addTextElement(element, "useCompression", String.valueOf(((RemoteGroupPort) port).isUseCompression())); - - parentElement.appendChild(element); - } - - private void addPort(final Element parentElement, final Port port, final String elementName) { - final Document doc = parentElement.getOwnerDocument(); - final Element element = doc.createElement(elementName); - parentElement.appendChild(element); - addTextElement(element, "id", port.getIdentifier()); - addTextElement(element, "name", port.getName()); - addPosition(element, port.getPosition()); - addTextElement(element, "comments", port.getComments()); - addTextElement(element, "scheduledState", port.getScheduledState().name()); - - parentElement.appendChild(element); - } - - private void addRootGroupPort(final Element parentElement, final RootGroupPort port, final String elementName) { - final Document doc = parentElement.getOwnerDocument(); - final Element element = doc.createElement(elementName); - parentElement.appendChild(element); - addTextElement(element, "id", port.getIdentifier()); - addTextElement(element, "name", port.getName()); - addPosition(element, port.getPosition()); - addTextElement(element, "comments", port.getComments()); - addTextElement(element, "scheduledState", port.getScheduledState().name()); - addTextElement(element, "maxConcurrentTasks", String.valueOf(port.getMaxConcurrentTasks())); - for (final String user : port.getUserAccessControl()) { - addTextElement(element, "userAccessControl", user); - } - for (final String group : port.getGroupAccessControl()) { - addTextElement(element, "groupAccessControl", group); - } - - parentElement.appendChild(element); - } - - private void addProcessor(final Element parentElement, final ProcessorNode processor) { - final Document doc = parentElement.getOwnerDocument(); - final Element element = doc.createElement("processor"); - parentElement.appendChild(element); - addTextElement(element, "id", processor.getIdentifier()); - addTextElement(element, "name", processor.getName()); - - addPosition(element, processor.getPosition()); - addStyle(element, processor.getStyle()); - - addTextElement(element, "comment", processor.getComments()); - addTextElement(element, "class", processor.getProcessor().getClass().getCanonicalName()); - addTextElement(element, "maxConcurrentTasks", processor.getMaxConcurrentTasks()); - addTextElement(element, "schedulingPeriod", processor.getSchedulingPeriod()); - addTextElement(element, "penalizationPeriod", processor.getPenalizationPeriod()); - addTextElement(element, "yieldPeriod", processor.getYieldPeriod()); - addTextElement(element, "bulletinLevel", processor.getBulletinLevel().toString()); - addTextElement(element, "lossTolerant", String.valueOf(processor.isLossTolerant())); - addTextElement(element, "scheduledState", processor.getScheduledState().name()); - addTextElement(element, "schedulingStrategy", processor.getSchedulingStrategy().name()); - addTextElement(element, "runDurationNanos", processor.getRunDuration(TimeUnit.NANOSECONDS)); - - // properties. - for (final Map.Entry<PropertyDescriptor, String> entry : processor.getProperties().entrySet()) { - final PropertyDescriptor descriptor = entry.getKey(); - String value = entry.getValue(); - - if (value != null && descriptor.isSensitive()) { - value = ENC_PREFIX + encryptor.encrypt(value) + ENC_SUFFIX; - } - - if (value == null) { - value = descriptor.getDefaultValue(); - } - - final Element propElement = doc.createElement("property"); - addTextElement(propElement, "name", descriptor.getName()); - if (value != null) { - addTextElement(propElement, "value", value); - } - - element.appendChild(propElement); - } - - final String annotationData = processor.getAnnotationData(); - if (annotationData != null) { - addTextElement(element, "annotationData", annotationData); - } - - for (final Relationship rel : processor.getAutoTerminatedRelationships()) { - addTextElement(element, "autoTerminatedRelationship", rel.getName()); - } - } - - private void addConnection(final Element parentElement, final Connection connection) { - final Document doc = parentElement.getOwnerDocument(); - final Element element = doc.createElement("connection"); - parentElement.appendChild(element); - addTextElement(element, "id", connection.getIdentifier()); - addTextElement(element, "name", connection.getName()); - - final Element bendPointsElement = doc.createElement("bendPoints"); - element.appendChild(bendPointsElement); - for (final Position bendPoint : connection.getBendPoints()) { - addPosition(bendPointsElement, bendPoint, "bendPoint"); - } - - addTextElement(element, "labelIndex", connection.getLabelIndex()); - addTextElement(element, "zIndex", connection.getZIndex()); - - final String sourceId = connection.getSource().getIdentifier(); - final ConnectableType sourceType = connection.getSource().getConnectableType(); - final String sourceGroupId; - if (sourceType == ConnectableType.REMOTE_OUTPUT_PORT) { - sourceGroupId = ((RemoteGroupPort) connection.getSource()).getRemoteProcessGroup().getIdentifier(); - } else { - sourceGroupId = connection.getSource().getProcessGroup().getIdentifier(); - } - - final ConnectableType destinationType = connection.getDestination().getConnectableType(); - final String destinationId = connection.getDestination().getIdentifier(); - final String destinationGroupId; - if (destinationType == ConnectableType.REMOTE_INPUT_PORT) { - destinationGroupId = ((RemoteGroupPort) connection.getDestination()).getRemoteProcessGroup().getIdentifier(); - } else { - destinationGroupId = connection.getDestination().getProcessGroup().getIdentifier(); - } - - addTextElement(element, "sourceId", sourceId); - addTextElement(element, "sourceGroupId", sourceGroupId); - addTextElement(element, "sourceType", sourceType.toString()); - - addTextElement(element, "destinationId", destinationId); - addTextElement(element, "destinationGroupId", destinationGroupId); - addTextElement(element, "destinationType", destinationType.toString()); - - for (final Relationship relationship : connection.getRelationships()) { - addTextElement(element, "relationship", relationship.getName()); - } - - addTextElement(element, "maxWorkQueueSize", connection.getFlowFileQueue().getBackPressureObjectThreshold()); - addTextElement(element, "maxWorkQueueDataSize", connection.getFlowFileQueue().getBackPressureDataSizeThreshold()); - - addTextElement(element, "flowFileExpiration", connection.getFlowFileQueue().getFlowFileExpiration()); - for (final FlowFilePrioritizer comparator : connection.getFlowFileQueue().getPriorities()) { - final String className = comparator.getClass().getCanonicalName(); - addTextElement(element, "queuePrioritizerClass", className); - } - - parentElement.appendChild(element); - } - - private void addTextElement(final Element element, final String name, final long value) { - addTextElement(element, name, String.valueOf(value)); - } - - private void addTextElement(final Element element, final String name, final String value) { - final Document doc = element.getOwnerDocument(); - final Element toAdd = doc.createElement(name); - toAdd.setTextContent(value); - element.appendChild(toAdd); - } - -}