http://git-wip-us.apache.org/repos/asf/nifi/blob/572dfed7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java index c886747..7ad9e84 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java @@ -45,6 +45,11 @@ import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO; import org.apache.nifi.web.api.dto.status.ProcessorStatusSnapshotDTO; import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO; import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusSnapshotDTO; +import org.apache.nifi.web.api.entity.ConnectionStatusSnapshotEntity; +import org.apache.nifi.web.api.entity.PortStatusSnapshotEntity; +import org.apache.nifi.web.api.entity.ProcessGroupStatusSnapshotEntity; +import org.apache.nifi.web.api.entity.ProcessorStatusSnapshotEntity; +import org.apache.nifi.web.api.entity.RemoteProcessGroupStatusSnapshotEntity; import java.util.ArrayList; import java.util.Collection; @@ -72,8 +77,14 @@ public class StatusMerger { target.setQueued(prettyPrint(target.getFlowFilesQueued(), target.getBytesQueued())); } - public static void merge(final ProcessGroupStatusDTO target, final ProcessGroupStatusDTO toMerge, final String nodeId, final String nodeAddress, final Integer nodeApiPort) { - merge(target.getAggregateSnapshot(), toMerge.getAggregateSnapshot()); + public static void merge(final ProcessGroupStatusDTO target, final boolean targetReadablePermission, final ProcessGroupStatusDTO toMerge, final boolean toMergeReadablePermission, + final String nodeId, final String nodeAddress, final Integer nodeApiPort) { + if (targetReadablePermission && !toMergeReadablePermission) { + target.setId(toMerge.getId()); + target.setName(toMerge.getName()); + } + + merge(target.getAggregateSnapshot(), targetReadablePermission, toMerge.getAggregateSnapshot(), toMergeReadablePermission); if (target.getNodeSnapshots() != null) { final NodeProcessGroupStatusSnapshotDTO nodeSnapshot = new NodeProcessGroupStatusSnapshotDTO(); @@ -86,11 +97,25 @@ public class StatusMerger { } } - public static void merge(final ProcessGroupStatusSnapshotDTO target, final ProcessGroupStatusSnapshotDTO toMerge) { + public static void merge(final ProcessGroupStatusSnapshotEntity target, ProcessGroupStatusSnapshotEntity toMerge) { if (target == null || toMerge == null) { return; } + merge(target.getProcessGroupStatusSnapshot(), target.getCanRead(), toMerge.getProcessGroupStatusSnapshot(), toMerge.getCanRead()); + } + + public static void merge(final ProcessGroupStatusSnapshotDTO target, final boolean targetReadablePermission, final ProcessGroupStatusSnapshotDTO toMerge, + final boolean toMergeReadablePermission) { + if (target == null || toMerge == null) { + return; + } + + if (targetReadablePermission && !toMergeReadablePermission) { + target.setId(toMerge.getId()); + target.setName(toMerge.getName()); + } + target.setBytesIn(target.getBytesIn() + toMerge.getBytesIn()); target.setFlowFilesIn(target.getFlowFilesIn() + toMerge.getFlowFilesIn()); @@ -117,13 +142,13 @@ public class StatusMerger { // connection status // sort by id - final Map<String, ConnectionStatusSnapshotDTO> mergedConnectionMap = new HashMap<>(); - for (final ConnectionStatusSnapshotDTO status : replaceNull(target.getConnectionStatusSnapshots())) { + final Map<String, ConnectionStatusSnapshotEntity> mergedConnectionMap = new HashMap<>(); + for (final ConnectionStatusSnapshotEntity status : replaceNull(target.getConnectionStatusSnapshots())) { mergedConnectionMap.put(status.getId(), status); } - for (final ConnectionStatusSnapshotDTO statusToMerge : replaceNull(toMerge.getConnectionStatusSnapshots())) { - ConnectionStatusSnapshotDTO merged = mergedConnectionMap.get(statusToMerge.getId()); + for (final ConnectionStatusSnapshotEntity statusToMerge : replaceNull(toMerge.getConnectionStatusSnapshots())) { + ConnectionStatusSnapshotEntity merged = mergedConnectionMap.get(statusToMerge.getId()); if (merged == null) { mergedConnectionMap.put(statusToMerge.getId(), statusToMerge.clone()); continue; @@ -134,13 +159,13 @@ public class StatusMerger { target.setConnectionStatusSnapshots(mergedConnectionMap.values()); // processor status - final Map<String, ProcessorStatusSnapshotDTO> mergedProcessorMap = new HashMap<>(); - for (final ProcessorStatusSnapshotDTO status : replaceNull(target.getProcessorStatusSnapshots())) { + final Map<String, ProcessorStatusSnapshotEntity> mergedProcessorMap = new HashMap<>(); + for (final ProcessorStatusSnapshotEntity status : replaceNull(target.getProcessorStatusSnapshots())) { mergedProcessorMap.put(status.getId(), status); } - for (final ProcessorStatusSnapshotDTO statusToMerge : replaceNull(toMerge.getProcessorStatusSnapshots())) { - ProcessorStatusSnapshotDTO merged = mergedProcessorMap.get(statusToMerge.getId()); + for (final ProcessorStatusSnapshotEntity statusToMerge : replaceNull(toMerge.getProcessorStatusSnapshots())) { + ProcessorStatusSnapshotEntity merged = mergedProcessorMap.get(statusToMerge.getId()); if (merged == null) { mergedProcessorMap.put(statusToMerge.getId(), statusToMerge.clone()); continue; @@ -152,13 +177,13 @@ public class StatusMerger { // input ports - final Map<String, PortStatusSnapshotDTO> mergedInputPortMap = new HashMap<>(); - for (final PortStatusSnapshotDTO status : replaceNull(target.getInputPortStatusSnapshots())) { + final Map<String, PortStatusSnapshotEntity> mergedInputPortMap = new HashMap<>(); + for (final PortStatusSnapshotEntity status : replaceNull(target.getInputPortStatusSnapshots())) { mergedInputPortMap.put(status.getId(), status); } - for (final PortStatusSnapshotDTO statusToMerge : replaceNull(toMerge.getInputPortStatusSnapshots())) { - PortStatusSnapshotDTO merged = mergedInputPortMap.get(statusToMerge.getId()); + for (final PortStatusSnapshotEntity statusToMerge : replaceNull(toMerge.getInputPortStatusSnapshots())) { + PortStatusSnapshotEntity merged = mergedInputPortMap.get(statusToMerge.getId()); if (merged == null) { mergedInputPortMap.put(statusToMerge.getId(), statusToMerge.clone()); continue; @@ -169,13 +194,13 @@ public class StatusMerger { target.setInputPortStatusSnapshots(mergedInputPortMap.values()); // output ports - final Map<String, PortStatusSnapshotDTO> mergedOutputPortMap = new HashMap<>(); - for (final PortStatusSnapshotDTO status : replaceNull(target.getOutputPortStatusSnapshots())) { + final Map<String, PortStatusSnapshotEntity> mergedOutputPortMap = new HashMap<>(); + for (final PortStatusSnapshotEntity status : replaceNull(target.getOutputPortStatusSnapshots())) { mergedOutputPortMap.put(status.getId(), status); } - for (final PortStatusSnapshotDTO statusToMerge : replaceNull(toMerge.getOutputPortStatusSnapshots())) { - PortStatusSnapshotDTO merged = mergedOutputPortMap.get(statusToMerge.getId()); + for (final PortStatusSnapshotEntity statusToMerge : replaceNull(toMerge.getOutputPortStatusSnapshots())) { + PortStatusSnapshotEntity merged = mergedOutputPortMap.get(statusToMerge.getId()); if (merged == null) { mergedOutputPortMap.put(statusToMerge.getId(), statusToMerge.clone()); continue; @@ -186,13 +211,13 @@ public class StatusMerger { target.setOutputPortStatusSnapshots(mergedOutputPortMap.values()); // child groups - final Map<String, ProcessGroupStatusSnapshotDTO> mergedGroupMap = new HashMap<>(); - for (final ProcessGroupStatusSnapshotDTO status : replaceNull(target.getProcessGroupStatusSnapshots())) { + final Map<String, ProcessGroupStatusSnapshotEntity> mergedGroupMap = new HashMap<>(); + for (final ProcessGroupStatusSnapshotEntity status : replaceNull(target.getProcessGroupStatusSnapshots())) { mergedGroupMap.put(status.getId(), status); } - for (final ProcessGroupStatusSnapshotDTO statusToMerge : replaceNull(toMerge.getProcessGroupStatusSnapshots())) { - ProcessGroupStatusSnapshotDTO merged = mergedGroupMap.get(statusToMerge.getId()); + for (final ProcessGroupStatusSnapshotEntity statusToMerge : replaceNull(toMerge.getProcessGroupStatusSnapshots())) { + ProcessGroupStatusSnapshotEntity merged = mergedGroupMap.get(statusToMerge.getId()); if (merged == null) { mergedGroupMap.put(statusToMerge.getId(), statusToMerge.clone()); continue; @@ -203,13 +228,13 @@ public class StatusMerger { target.setOutputPortStatusSnapshots(mergedOutputPortMap.values()); // remote groups - final Map<String, RemoteProcessGroupStatusSnapshotDTO> mergedRemoteGroupMap = new HashMap<>(); - for (final RemoteProcessGroupStatusSnapshotDTO status : replaceNull(target.getRemoteProcessGroupStatusSnapshots())) { + final Map<String, RemoteProcessGroupStatusSnapshotEntity> mergedRemoteGroupMap = new HashMap<>(); + for (final RemoteProcessGroupStatusSnapshotEntity status : replaceNull(target.getRemoteProcessGroupStatusSnapshots())) { mergedRemoteGroupMap.put(status.getId(), status); } - for (final RemoteProcessGroupStatusSnapshotDTO statusToMerge : replaceNull(toMerge.getRemoteProcessGroupStatusSnapshots())) { - RemoteProcessGroupStatusSnapshotDTO merged = mergedRemoteGroupMap.get(statusToMerge.getId()); + for (final RemoteProcessGroupStatusSnapshotEntity statusToMerge : replaceNull(toMerge.getRemoteProcessGroupStatusSnapshots())) { + RemoteProcessGroupStatusSnapshotEntity merged = mergedRemoteGroupMap.get(statusToMerge.getId()); if (merged == null) { mergedRemoteGroupMap.put(statusToMerge.getId(), statusToMerge.clone()); continue; @@ -221,7 +246,7 @@ public class StatusMerger { } private static <T> Collection<T> replaceNull(final Collection<T> collection) { - return (collection == null) ? Collections.<T> emptyList() : collection; + return (collection == null) ? Collections.<T>emptyList() : collection; } @@ -230,7 +255,7 @@ public class StatusMerger { * {@link ProcessGroupStatusSnapshotDTO#setInput(String)} will be called with the pretty-printed form of the * FlowFile counts and sizes retrieved via {@link ProcessGroupStatusSnapshotDTO#getFlowFilesIn()} and * {@link ProcessGroupStatusSnapshotDTO#getBytesIn()}. - * + * <p> * This logic is performed here, rather than in the DTO itself because the DTO needs to be kept purely * getters & setters - otherwise the automatic marshalling and unmarshalling to/from JSON becomes very * complicated. @@ -250,8 +275,16 @@ public class StatusMerger { target.setSent(prettyPrint(target.getFlowFilesSent(), target.getBytesSent())); } - public static void merge(final RemoteProcessGroupStatusDTO target, final RemoteProcessGroupStatusDTO toMerge, final String nodeId, final String nodeAddress, final Integer nodeApiPort) { - merge(target.getAggregateSnapshot(), toMerge.getAggregateSnapshot()); + public static void merge(final RemoteProcessGroupStatusDTO target, final boolean targetReadablePermission, final RemoteProcessGroupStatusDTO toMerge, + final boolean toMergeReadablePermission, final String nodeId, final String nodeAddress, final Integer nodeApiPort) { + if (targetReadablePermission && !toMergeReadablePermission) { + target.setGroupId(toMerge.getGroupId()); + target.setId(toMerge.getId()); + target.setName(toMerge.getName()); + target.setTargetUri(toMerge.getTargetUri()); + } + + merge(target.getAggregateSnapshot(), targetReadablePermission, toMerge.getAggregateSnapshot(), toMergeReadablePermission); if (target.getNodeSnapshots() != null) { final NodeRemoteProcessGroupStatusSnapshotDTO nodeSnapshot = new NodeRemoteProcessGroupStatusSnapshotDTO(); @@ -264,8 +297,15 @@ public class StatusMerger { } } - public static void merge(final PortStatusDTO target, final PortStatusDTO toMerge, final String nodeId, final String nodeAddress, final Integer nodeApiPort) { - merge(target.getAggregateSnapshot(), toMerge.getAggregateSnapshot()); + public static void merge(final PortStatusDTO target, final boolean targetReadablePermission, final PortStatusDTO toMerge, final boolean toMergeReadablePermission, final String nodeId, + final String nodeAddress, final Integer nodeApiPort) { + if (targetReadablePermission && !toMergeReadablePermission) { + target.setGroupId(toMerge.getGroupId()); + target.setId(toMerge.getId()); + target.setName(toMerge.getName()); + } + + merge(target.getAggregateSnapshot(), targetReadablePermission, toMerge.getAggregateSnapshot(), toMergeReadablePermission); if (target.getNodeSnapshots() != null) { final NodePortStatusSnapshotDTO nodeSnapshot = new NodePortStatusSnapshotDTO(); @@ -278,8 +318,19 @@ public class StatusMerger { } } - public static void merge(final ConnectionStatusDTO target, final ConnectionStatusDTO toMerge, final String nodeId, final String nodeAddress, final Integer nodeApiPort) { - merge(target.getAggregateSnapshot(), toMerge.getAggregateSnapshot()); + public static void merge(final ConnectionStatusDTO target, final boolean targetReadablePermission, final ConnectionStatusDTO toMerge, final boolean toMergeReadablePermission, + final String nodeId, final String nodeAddress, final Integer nodeApiPort) { + if (targetReadablePermission && !toMergeReadablePermission) { + target.setGroupId(toMerge.getGroupId()); + target.setId(toMerge.getId()); + target.setName(toMerge.getName()); + target.setSourceId(toMerge.getSourceId()); + target.setSourceName(toMerge.getSourceName()); + target.setDestinationId(toMerge.getDestinationId()); + target.setDestinationName(toMerge.getDestinationName()); + } + + merge(target.getAggregateSnapshot(), targetReadablePermission, toMerge.getAggregateSnapshot(), toMergeReadablePermission); if (target.getNodeSnapshots() != null) { final NodeConnectionStatusSnapshotDTO nodeSnapshot = new NodeConnectionStatusSnapshotDTO(); @@ -292,8 +343,16 @@ public class StatusMerger { } } - public static void merge(final ProcessorStatusDTO target, final ProcessorStatusDTO toMerge, final String nodeId, final String nodeAddress, final Integer nodeApiPort) { - merge(target.getAggregateSnapshot(), toMerge.getAggregateSnapshot()); + public static void merge(final ProcessorStatusDTO target, final boolean targetReadablePermission, final ProcessorStatusDTO toMerge, final boolean toMergeReadablePermission, + final String nodeId, final String nodeAddress, final Integer nodeApiPort) { + if (targetReadablePermission && !toMergeReadablePermission) { + target.setGroupId(toMerge.getGroupId()); + target.setId(toMerge.getId()); + target.setName(toMerge.getName()); + target.setType(toMerge.getType()); + } + + merge(target.getAggregateSnapshot(), targetReadablePermission, toMerge.getAggregateSnapshot(), toMergeReadablePermission); if (target.getNodeSnapshots() != null) { final NodeProcessorStatusSnapshotDTO nodeSnapshot = new NodeProcessorStatusSnapshotDTO(); @@ -306,11 +365,27 @@ public class StatusMerger { } } - public static void merge(final ProcessorStatusSnapshotDTO target, final ProcessorStatusSnapshotDTO toMerge) { + public static void merge(final ProcessorStatusSnapshotEntity target, ProcessorStatusSnapshotEntity toMerge) { if (target == null || toMerge == null) { return; } + merge(target.getProcessorStatusSnapshot(), target.getCanRead(), toMerge.getProcessorStatusSnapshot(), toMerge.getCanRead()); + } + + public static void merge(final ProcessorStatusSnapshotDTO target, final boolean targetReadablePermission, final ProcessorStatusSnapshotDTO toMerge, + final boolean toMergeReadablePermission) { + if (target == null || toMerge == null) { + return; + } + + if (targetReadablePermission && !toMergeReadablePermission) { + target.setGroupId(toMerge.getGroupId()); + target.setId(toMerge.getId()); + target.setName(toMerge.getName()); + target.setType(toMerge.getType()); + } + // if the status to merge is invalid allow it to take precedence. whether the // processor run status is disabled/stopped/running is part of the flow configuration // and should not differ amongst nodes. however, whether a processor is invalid @@ -345,12 +420,30 @@ public class StatusMerger { target.setTasksDuration(FormatUtils.formatHoursMinutesSeconds(target.getTasksDurationNanos(), TimeUnit.NANOSECONDS)); } + public static void merge(final ConnectionStatusSnapshotEntity target, ConnectionStatusSnapshotEntity toMerge) { + if (target == null || toMerge == null) { + return; + } + + merge(target.getConnectionStatusSnapshot(), target.getCanRead(), toMerge.getConnectionStatusSnapshot(), toMerge.getCanRead()); + } - public static void merge(final ConnectionStatusSnapshotDTO target, final ConnectionStatusSnapshotDTO toMerge) { + public static void merge(final ConnectionStatusSnapshotDTO target, final boolean targetReadablePermission, final ConnectionStatusSnapshotDTO toMerge, + final boolean toMergeReadablePermission) { if (target == null || toMerge == null) { return; } + if (targetReadablePermission && !toMergeReadablePermission) { + target.setGroupId(toMerge.getGroupId()); + target.setId(toMerge.getId()); + target.setName(toMerge.getName()); + target.setSourceId(toMerge.getSourceId()); + target.setSourceName(toMerge.getSourceName()); + target.setDestinationId(toMerge.getDestinationId()); + target.setDestinationName(toMerge.getDestinationName()); + } + target.setFlowFilesIn(target.getFlowFilesIn() + toMerge.getFlowFilesIn()); target.setBytesIn(target.getBytesIn() + toMerge.getBytesIn()); target.setFlowFilesOut(target.getFlowFilesOut() + toMerge.getFlowFilesOut()); @@ -369,8 +462,27 @@ public class StatusMerger { } + public static void merge(final RemoteProcessGroupStatusSnapshotEntity target, RemoteProcessGroupStatusSnapshotEntity toMerge) { + if (target == null || toMerge == null) { + return; + } + + merge(target.getRemoteProcessGroupStatusSnapshot(), target.getCanRead(), toMerge.getRemoteProcessGroupStatusSnapshot(), toMerge.getCanRead()); + } + + public static void merge(final RemoteProcessGroupStatusSnapshotDTO target, final boolean targetReadablePermission, final RemoteProcessGroupStatusSnapshotDTO toMerge, + final boolean toMergeReadablePermission) { + if (target == null || toMerge == null) { + return; + } + + if (targetReadablePermission && !toMergeReadablePermission) { + target.setGroupId(toMerge.getGroupId()); + target.setId(toMerge.getId()); + target.setName(toMerge.getName()); + target.setTargetUri(toMerge.getTargetUri()); + } - public static void merge(final RemoteProcessGroupStatusSnapshotDTO target, final RemoteProcessGroupStatusSnapshotDTO toMerge) { final String transmittingValue = TransmissionStatus.Transmitting.name(); if (transmittingValue.equals(target.getTransmissionStatus()) || transmittingValue.equals(toMerge.getTransmissionStatus())) { target.setTransmissionStatus(transmittingValue); @@ -391,12 +503,25 @@ public class StatusMerger { } + public static void merge(final PortStatusSnapshotEntity target, PortStatusSnapshotEntity toMerge) { + if (target == null || toMerge == null) { + return; + } - public static void merge(final PortStatusSnapshotDTO target, final PortStatusSnapshotDTO toMerge) { + merge(target.getPortStatusSnapshot(), target.getCanRead(), toMerge.getPortStatusSnapshot(), toMerge.getCanRead()); + } + + public static void merge(final PortStatusSnapshotDTO target, final boolean targetReadablePermission, final PortStatusSnapshotDTO toMerge, final boolean toMergeReadablePermission) { if (target == null || toMerge == null) { return; } + if (targetReadablePermission && !toMergeReadablePermission) { + target.setGroupId(toMerge.getGroupId()); + target.setId(toMerge.getId()); + target.setName(toMerge.getName()); + } + target.setActiveThreadCount(target.getActiveThreadCount() + toMerge.getActiveThreadCount()); target.setFlowFilesIn(target.getFlowFilesIn() + toMerge.getFlowFilesIn()); target.setBytesIn(target.getBytesIn() + toMerge.getBytesIn());
http://git-wip-us.apache.org/repos/asf/nifi/blob/572dfed7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMergerSpec.groovy ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMergerSpec.groovy b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMergerSpec.groovy new file mode 100644 index 0000000..bd9b265 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMergerSpec.groovy @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.coordination.http + +import com.sun.jersey.api.client.ClientResponse +import org.apache.nifi.cluster.manager.NodeResponse +import org.apache.nifi.cluster.protocol.NodeIdentifier +import org.apache.nifi.util.NiFiProperties +import org.apache.nifi.web.api.dto.AccessPolicyDTO +import org.apache.nifi.web.api.dto.ConnectionDTO +import org.apache.nifi.web.api.dto.ControllerConfigurationDTO +import org.apache.nifi.web.api.dto.FunnelDTO +import org.apache.nifi.web.api.dto.LabelDTO +import org.apache.nifi.web.api.dto.PermissionsDTO +import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO +import org.apache.nifi.web.api.dto.status.ConnectionStatusSnapshotDTO +import org.apache.nifi.web.api.entity.ConnectionEntity +import org.apache.nifi.web.api.entity.ConnectionsEntity +import org.apache.nifi.web.api.entity.ControllerConfigurationEntity +import org.apache.nifi.web.api.entity.FunnelEntity +import org.apache.nifi.web.api.entity.FunnelsEntity +import org.apache.nifi.web.api.entity.LabelEntity +import org.apache.nifi.web.api.entity.LabelsEntity +import org.codehaus.jackson.map.ObjectMapper +import org.codehaus.jackson.map.SerializationConfig +import org.codehaus.jackson.map.annotate.JsonSerialize +import org.codehaus.jackson.xc.JaxbAnnotationIntrospector +import spock.lang.Specification +import spock.lang.Unroll + +@Unroll +class StandardHttpResponseMergerSpec extends Specification { + + def setup() { + System.setProperty NiFiProperties.PROPERTIES_FILE_PATH, "src/test/resources/conf/nifi.properties" + } + + def cleanup() { + System.clearProperty NiFiProperties.PROPERTIES_FILE_PATH + } + + def "MergeResponses: mixed HTTP GET response statuses, expecting #expectedStatus"() { + given: + def responseMerger = new StandardHttpResponseMerger() + def requestUri = new URI('http://server/resource') + def requestId = UUID.randomUUID().toString() + def Map<ClientResponse, Map<String, Integer>> mockToRequestEntity = [:] + def nodeResponseSet = nodeResponseData.collect { + int n = it.node + def clientResponse = Mock(ClientResponse) + mockToRequestEntity.put clientResponse, it + new NodeResponse(new NodeIdentifier("cluster-node-$n", 'addr', n, 'sktaddr', n * 10, 'stsaddr', n * 100, n * 1000, false, null), "get", requestUri, clientResponse, 500L, requestId) + } as Set + + when: + def returnedResponse = responseMerger.mergeResponses(requestUri, 'get', nodeResponseSet).getStatus() + + then: + mockToRequestEntity.entrySet().forEach { + ClientResponse mockClientResponse = it.key + _ * mockClientResponse.getStatus() >> it.value.status + } + 0 * _ + returnedResponse == expectedStatus + + where: + nodeResponseData || expectedStatus + [[node: 1, status: 200], [node: 2, status: 200], [node: 3, status: 401]] as Set || 401 + [[node: 1, status: 200], [node: 2, status: 200], [node: 3, status: 403]] as Set || 403 + [[node: 1, status: 200], [node: 2, status: 403], [node: 3, status: 500]] as Set || 403 + [[node: 1, status: 200], [node: 2, status: 200], [node: 3, status: 500]] as Set || 500 + } + + def "MergeResponses: #responseEntities.size() HTTP 200 #httpMethod responses for #requestUriPart"() { + given: "json serialization setup" + def mapper = new ObjectMapper(); + def jaxbIntrospector = new JaxbAnnotationIntrospector(); + def SerializationConfig serializationConfig = mapper.getSerializationConfig(); + mapper.setSerializationConfig(serializationConfig.withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL).withAnnotationIntrospector(jaxbIntrospector)); + + and: "setup of the data to be used in the test" + def responseMerger = new StandardHttpResponseMerger() + def requestUri = new URI("http://server/$requestUriPart") + def requestId = UUID.randomUUID().toString() + def Map<ClientResponse, Object> mockToRequestEntity = [:] + def n = 0 + def nodeResponseSet = responseEntities.collect { + ++n + def clientResponse = Mock(ClientResponse) + mockToRequestEntity.put clientResponse, it + new NodeResponse(new NodeIdentifier("cluster-node-$n", 'addr', n, 'sktaddr', n * 10, 'stsaddr', n * 100, n * 1000, false, null), "get", requestUri, clientResponse, 500L, requestId) + } as Set + + when: + def returnedResponse = responseMerger.mergeResponses(requestUri, httpMethod, nodeResponseSet) + + then: + mockToRequestEntity.entrySet().forEach { + ClientResponse mockClientResponse = it.key + def entity = it.value + _ * mockClientResponse.getStatus() >> 200 + 1 * mockClientResponse.getEntity(_) >> entity + } + responseEntities.size() == mockToRequestEntity.size() + 0 * _ + def returnedJson = mapper.writeValueAsString(returnedResponse.getUpdatedEntity()) + def expectedJson = mapper.writeValueAsString(expectedEntity) + returnedJson == expectedJson + + where: + requestUriPart | httpMethod | responseEntities || + expectedEntity + 'nifi-api/controller/config' | 'get' | [ + new ControllerConfigurationEntity(permissions: new PermissionsDTO(canRead: true, canWrite: true), + component: new ControllerConfigurationDTO(maxEventDrivenThreadCount: 10, maxTimerDrivenThreadCount: 10)), + new ControllerConfigurationEntity(permissions: new PermissionsDTO(canRead: true, canWrite: false), + component: new ControllerConfigurationDTO(maxEventDrivenThreadCount: 10, maxTimerDrivenThreadCount: 10)), + new ControllerConfigurationEntity(permissions: new PermissionsDTO(canRead: true, canWrite: true), + component: new ControllerConfigurationDTO(maxEventDrivenThreadCount: 10, maxTimerDrivenThreadCount: 10))] || + // expectedEntity + new ControllerConfigurationEntity(permissions: new PermissionsDTO(canRead: true, canWrite: false), + component: new ControllerConfigurationDTO(maxEventDrivenThreadCount: 10, maxTimerDrivenThreadCount: 10)) + 'nifi-api/controller/config' | 'put' | [ + new ControllerConfigurationEntity(permissions: new PermissionsDTO(canRead: true, canWrite: true), + component: new ControllerConfigurationDTO(maxEventDrivenThreadCount: 10, maxTimerDrivenThreadCount: 10)), + new ControllerConfigurationEntity(permissions: new PermissionsDTO(canRead: true, canWrite: false), + component: new ControllerConfigurationDTO(maxEventDrivenThreadCount: 10, maxTimerDrivenThreadCount: 10)), + new ControllerConfigurationEntity(permissions: new PermissionsDTO(canRead: true, canWrite: true), + component: new ControllerConfigurationDTO(maxEventDrivenThreadCount: 10, maxTimerDrivenThreadCount: 10))] || + // expectedEntity + new ControllerConfigurationEntity(permissions: new PermissionsDTO(canRead: true, canWrite: false), + component: new ControllerConfigurationDTO(maxEventDrivenThreadCount: 10, maxTimerDrivenThreadCount: 10)) + "nifi-api/process-groups/${UUID.randomUUID()}/connections" | 'get' | [ + new ConnectionsEntity(connections: [new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), status: new + ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 300)), component: new ConnectionDTO())] as Set), + new ConnectionsEntity(connections: [new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false), status: new + ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 100)))] as Set), + new ConnectionsEntity(connections: [new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), status: new + ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 500)), component: new ConnectionDTO())] as Set)] || + // expectedEntity + new ConnectionsEntity(connections: [new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false), + status: new ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 900, + input: '0 (900 bytes)', output: '0 (0 bytes)', queued: '0 (0 bytes)', queuedSize: '0 bytes', queuedCount: 0)))] as Set) + "nifi-api/process-groups/${UUID.randomUUID()}/connections" | 'post' | [ + new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), status: + new ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 300)), component: new ConnectionDTO()), + new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false), status: + new ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 300))), + new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), status: + new ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 300)), component: new ConnectionDTO())] || + // expectedEntity + new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false), + status: new ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 900, input: '0 (900 bytes)', + output: '0 (0 bytes)', queued: '0 (0 bytes)', queuedSize: '0 bytes', queuedCount: 0))) + "nifi-api/connections/${UUID.randomUUID()}" | 'get' | [ + new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), status: + new ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 400)), component: new ConnectionDTO()), + new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false), status: + new ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 300))), + new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), status: + new ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 300)), component: new ConnectionDTO())] || + // expectedEntity + new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false), + status: new ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 1000, + input: '0 (1,000 bytes)', output: '0 (0 bytes)', queued: '0 (0 bytes)', queuedSize: '0 bytes', queuedCount: 0))) + "nifi-api/process-groups/${UUID.randomUUID()}/labels" | 'get' | [ + new LabelsEntity(labels: [new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new LabelDTO())] as Set), + new LabelsEntity(labels: [new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false))] as Set), + new LabelsEntity(labels: [new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new LabelDTO())] as Set)] || + // expectedEntity + new LabelsEntity(labels: [new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false))] as Set) + "nifi-api/process-groups/${UUID.randomUUID()}/labels" | 'post' | [ + new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new LabelDTO()), + new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false)), + new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new LabelDTO())] || + // expectedEntity + new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false)) + "nifi-api/labels/${UUID.randomUUID()}" | 'get' | [ + new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new LabelDTO()), + new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false)), + new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new LabelDTO())] || + // expectedEntity + new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false)) + "nifi-api/process-groups/${UUID.randomUUID()}/funnels" | 'get' | [ + new FunnelsEntity(funnels: [new FunnelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new FunnelDTO())] as Set), + new FunnelsEntity(funnels: [new FunnelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false))] as Set), + new FunnelsEntity(funnels: [new FunnelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new FunnelDTO())] as Set)] || + // expectedEntity + new FunnelsEntity(funnels: [new FunnelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false))] as Set) + "nifi-api/process-groups/${UUID.randomUUID()}/funnels" | 'post' | [ + new FunnelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new FunnelDTO()), + new FunnelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false)), + new FunnelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new FunnelDTO())] || + // expectedEntity + new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false)) + "nifi-api/funnels/${UUID.randomUUID()}" | 'get' | [ + new FunnelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new FunnelDTO()), + new FunnelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false)), + new FunnelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new FunnelDTO())] || + // expectedEntity + new FunnelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false)) + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/572dfed7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMergerSpec.groovy ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMergerSpec.groovy b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMergerSpec.groovy new file mode 100644 index 0000000..69dd82a --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMergerSpec.groovy @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.coordination.http.endpoints + +import com.sun.jersey.api.client.ClientResponse +import org.apache.nifi.cluster.manager.NodeResponse +import org.apache.nifi.cluster.protocol.NodeIdentifier +import org.apache.nifi.util.NiFiProperties +import org.apache.nifi.web.api.dto.status.StatusHistoryDTO +import org.apache.nifi.web.api.entity.StatusHistoryEntity +import org.codehaus.jackson.map.ObjectMapper +import org.codehaus.jackson.map.SerializationConfig +import org.codehaus.jackson.map.annotate.JsonSerialize +import org.codehaus.jackson.xc.JaxbAnnotationIntrospector +import spock.lang.Specification +import spock.lang.Unroll + +class StatusHistoryEndpointMergerSpec extends Specification { + + def setup() { + System.setProperty NiFiProperties.PROPERTIES_FILE_PATH, "src/test/resources/conf/nifi.properties" + } + + def cleanup() { + System.clearProperty NiFiProperties.PROPERTIES_FILE_PATH + } + + @Unroll + def "Merge component details based on permission"() { + given: "json serialization setup" + def mapper = new ObjectMapper(); + def jaxbIntrospector = new JaxbAnnotationIntrospector(); + def SerializationConfig serializationConfig = mapper.getSerializationConfig(); + mapper.setSerializationConfig(serializationConfig.withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL).withAnnotationIntrospector(jaxbIntrospector)); + + and: "setup of the data to be used in the test" + def merger = new StatusHistoryEndpointMerger() + def requestUri = new URI("http://server/$requestUriPart") + def requestId = UUID.randomUUID().toString() + def Map<ClientResponse, Object> mockToRequestEntity = [:] + def n = 0 + def nodeResponseSet = responseEntities.collect { + ++n + def clientResponse = Mock(ClientResponse) + mockToRequestEntity.put clientResponse, it + new NodeResponse(new NodeIdentifier("cluster-node-$n", 'addr', n, 'sktaddr', n * 10, 'stsaddr', n * 100, n * 1000, false, null), "get", requestUri, clientResponse, 500L, requestId) + } as Set + + when: + def returnedResponse = merger.merge(requestUri, httpMethod, nodeResponseSet, [] as Set, nodeResponseSet[0]) + + then: + mockToRequestEntity.entrySet().forEach { + ClientResponse mockClientResponse = it.key + def entity = it.value + _ * mockClientResponse.getStatus() >> 200 + 1 * mockClientResponse.getEntity(_) >> entity + } + responseEntities.size() == mockToRequestEntity.size() + 0 * _ + (returnedResponse.getUpdatedEntity() as StatusHistoryEntity).canRead == expectedEntity.canRead + (returnedResponse.getUpdatedEntity() as StatusHistoryEntity).statusHistory.componentDetails == expectedEntity.statusHistory.componentDetails + + where: + requestUriPart | httpMethod | responseEntities || + expectedEntity + "/nifi-api/flow/connections/${UUID.randomUUID()}/status/history" | 'get' | [ + new StatusHistoryEntity(canRead: true, statusHistory: new StatusHistoryDTO(componentDetails: [key1: 'real', key2: 'real'], nodeSnapshots: [], aggregateSnapshots: [])), + new StatusHistoryEntity(canRead: false, statusHistory: new StatusHistoryDTO(componentDetails: [key1: 'hidden', key2: 'hidden'], nodeSnapshots: [], aggregateSnapshots: [])), + new StatusHistoryEntity(canRead: true, statusHistory: new StatusHistoryDTO(componentDetails: [key1: 'real', key2: 'real'], nodeSnapshots: [], aggregateSnapshots: [])) + ] || + new StatusHistoryEntity(canRead: false, statusHistory: new StatusHistoryDTO(componentDetails: [key1: 'hidden', key2: 'hidden'], nodeSnapshots: [], aggregateSnapshots: [])) + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/572dfed7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ConnectionEntityMergerSpec.groovy ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ConnectionEntityMergerSpec.groovy b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ConnectionEntityMergerSpec.groovy new file mode 100644 index 0000000..3f81975 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ConnectionEntityMergerSpec.groovy @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager + +import org.apache.nifi.cluster.protocol.NodeIdentifier +import org.apache.nifi.web.api.dto.ConnectionDTO +import org.apache.nifi.web.api.dto.ControllerConfigurationDTO +import org.apache.nifi.web.api.dto.PermissionsDTO +import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO +import org.apache.nifi.web.api.dto.status.ConnectionStatusSnapshotDTO +import org.apache.nifi.web.api.entity.ConnectionEntity +import org.apache.nifi.web.api.entity.ConnectionsEntity +import org.apache.nifi.web.api.entity.ControllerConfigurationEntity +import org.codehaus.jackson.map.ObjectMapper +import org.codehaus.jackson.map.SerializationConfig +import org.codehaus.jackson.map.annotate.JsonSerialize +import org.codehaus.jackson.xc.JaxbAnnotationIntrospector +import spock.lang.Specification +import spock.lang.Unroll + +class ConnectionEntityMergerSpec extends Specification { + + @Unroll + def "Merge"() { + given: + def mapper = new ObjectMapper(); + def jaxbIntrospector = new JaxbAnnotationIntrospector(); + def SerializationConfig serializationConfig = mapper.getSerializationConfig(); + mapper.setSerializationConfig(serializationConfig.withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL).withAnnotationIntrospector(jaxbIntrospector)) + def entity = nodeEntityMap.entrySet().first().value + + when: + new ConnectionEntityMerger().merge(entity, nodeEntityMap) + + then: + def mergedEntityJson = mapper.writeValueAsString(entity) + def expectedJson = mapper.writeValueAsString(expectedMergedEntity) + mergedEntityJson == expectedJson + + where: + nodeEntityMap || + expectedMergedEntity + [(createNodeIdentifier(1)): new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), status: new + ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 300)), component: new ConnectionDTO()), + (createNodeIdentifier(2)): new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false), status: new + ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 100))), + (createNodeIdentifier(3)): new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), status: new + ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 500)), component: new ConnectionDTO())] || + new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false), + status: new ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 900, input: '0 (900 bytes)', + output: '0 (0 bytes)', queued: '0 (0 bytes)', queuedSize: '0 bytes', queuedCount: 0))) + + } + + def createNodeIdentifier(int id) { + new NodeIdentifier("cluster-node-$id", 'addr', id, 'sktaddr', id * 10, 'stsaddr', id * 100, id * 1000, false, null) + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/572dfed7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ControllerServiceEntityMergerSpec.groovy ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ControllerServiceEntityMergerSpec.groovy b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ControllerServiceEntityMergerSpec.groovy new file mode 100644 index 0000000..8b2c76c --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ControllerServiceEntityMergerSpec.groovy @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager + +import org.apache.nifi.cluster.protocol.NodeIdentifier +import org.apache.nifi.controller.service.ControllerServiceState +import org.apache.nifi.web.api.dto.ConnectionDTO +import org.apache.nifi.web.api.dto.ControllerServiceDTO +import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO +import org.apache.nifi.web.api.dto.PermissionsDTO +import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO +import org.apache.nifi.web.api.dto.status.ConnectionStatusSnapshotDTO +import org.apache.nifi.web.api.entity.ConnectionEntity +import org.apache.nifi.web.api.entity.ControllerServiceEntity +import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentEntity +import org.codehaus.jackson.map.ObjectMapper +import org.codehaus.jackson.map.SerializationConfig +import org.codehaus.jackson.map.annotate.JsonSerialize +import org.codehaus.jackson.xc.JaxbAnnotationIntrospector +import spock.lang.Specification +import spock.lang.Unroll + +@Unroll +class ControllerServiceEntityMergerSpec extends Specification { + def "MergeComponents"() { + def mapper = new ObjectMapper(); + def jaxbIntrospector = new JaxbAnnotationIntrospector(); + def SerializationConfig serializationConfig = mapper.getSerializationConfig(); + mapper.setSerializationConfig(serializationConfig.withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL).withAnnotationIntrospector(jaxbIntrospector)) + def entity = nodeEntityMap.entrySet().first().value + + when: + new ControllerServiceEntityMerger().merge(entity, nodeEntityMap) + + then: + def mergedEntityJson = mapper.writeValueAsString(entity) + def expectedJson = mapper.writeValueAsString(expectedMergedEntity) + mergedEntityJson == expectedJson + + where: + nodeEntityMap || + expectedMergedEntity + // Simple ControllerServiceEntity merging + [(createNodeIdentifier(1)): new ControllerServiceEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), + component: new ControllerServiceDTO()), + (createNodeIdentifier(2)): new ControllerServiceEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false), + component: new ControllerServiceDTO()), + (createNodeIdentifier(3)): new ControllerServiceEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), + component: new ControllerServiceDTO())] || + new ControllerServiceEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false)) + // Controller Reference merging for canRead==false + [(createNodeIdentifier(1)): new ControllerServiceEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), + component: new ControllerServiceDTO(referencingComponents: [new ControllerServiceReferencingComponentEntity(permissions: new PermissionsDTO(canRead: true, canWrite: true), + component: new ControllerServiceReferencingComponentDTO(activeThreadCount: 1, state: ControllerServiceState.ENABLING.name()))])), + (createNodeIdentifier(2)): new ControllerServiceEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), + component: new ControllerServiceDTO(referencingComponents: [new ControllerServiceReferencingComponentEntity(permissions: new PermissionsDTO(canRead: false, canWrite: false), + component: new ControllerServiceReferencingComponentDTO(activeThreadCount: 1, state: ControllerServiceState.ENABLING.name()))])), + (createNodeIdentifier(3)): new ControllerServiceEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), + component: new ControllerServiceDTO(referencingComponents: [new ControllerServiceReferencingComponentEntity(permissions: new PermissionsDTO(canRead: true, canWrite: true), + component: new ControllerServiceReferencingComponentDTO(activeThreadCount: 1, state: ControllerServiceState.ENABLING.name()))]))] || + new ControllerServiceEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), + bulletins: [], + component: new ControllerServiceDTO(validationErrors: [], + referencingComponents: [new ControllerServiceReferencingComponentEntity(permissions: new PermissionsDTO(canRead: false, canWrite: false))])) + // Controller Reference merging for canRead==true + [(createNodeIdentifier(1)): new ControllerServiceEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), + component: new ControllerServiceDTO(referencingComponents: [new ControllerServiceReferencingComponentEntity(permissions: new PermissionsDTO(canRead: true, canWrite: true), + component: new ControllerServiceReferencingComponentDTO(activeThreadCount: 1, state: ControllerServiceState.ENABLING.name()))])), + (createNodeIdentifier(2)): new ControllerServiceEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), + component: new ControllerServiceDTO(referencingComponents: [new ControllerServiceReferencingComponentEntity(permissions: new PermissionsDTO(canRead: true, canWrite: true), + component: new ControllerServiceReferencingComponentDTO(activeThreadCount: 1, state: ControllerServiceState.ENABLING.name()))])), + (createNodeIdentifier(3)): new ControllerServiceEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), + component: new ControllerServiceDTO(referencingComponents: [new ControllerServiceReferencingComponentEntity(permissions: new PermissionsDTO(canRead: true, canWrite: true), + component: new ControllerServiceReferencingComponentDTO(activeThreadCount: 1, state: ControllerServiceState.ENABLING.name()))]))] || + new ControllerServiceEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), + bulletins: [], + component: new ControllerServiceDTO(validationErrors: [], + referencingComponents: [new ControllerServiceReferencingComponentEntity(permissions: new PermissionsDTO(canRead: true, canWrite: true), + component: new ControllerServiceReferencingComponentDTO(activeThreadCount: 3, state: ControllerServiceState.ENABLING.name()))])) + } + + def "MergeControllerServiceReferences"() { + + } + + def createNodeIdentifier(int id) { + new NodeIdentifier("cluster-node-$id", 'addr', id, 'sktaddr', id * 10, 'stsaddr', id * 100, id * 1000, false, null) + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/572dfed7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/LabelEntityMergerSpec.groovy ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/LabelEntityMergerSpec.groovy b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/LabelEntityMergerSpec.groovy new file mode 100644 index 0000000..5ebdf0e --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/LabelEntityMergerSpec.groovy @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager + +import org.apache.nifi.cluster.protocol.NodeIdentifier +import org.apache.nifi.web.api.dto.LabelDTO +import org.apache.nifi.web.api.dto.PermissionsDTO +import org.apache.nifi.web.api.entity.LabelEntity +import org.codehaus.jackson.map.ObjectMapper +import org.codehaus.jackson.map.SerializationConfig +import org.codehaus.jackson.map.annotate.JsonSerialize +import org.codehaus.jackson.xc.JaxbAnnotationIntrospector +import spock.lang.Specification +import spock.lang.Unroll + +@Unroll +class LabelEntityMergerSpec extends Specification { + def "Merge"() { + given: + def mapper = new ObjectMapper(); + def jaxbIntrospector = new JaxbAnnotationIntrospector(); + def SerializationConfig serializationConfig = mapper.getSerializationConfig(); + mapper.setSerializationConfig(serializationConfig.withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL).withAnnotationIntrospector(jaxbIntrospector)) + def entity = nodeEntityMap.entrySet().first().value + + when: + new LabelEntityMerger().merge(entity, nodeEntityMap) + + then: + def mergedEntityJson = mapper.writeValueAsString(entity) + def expectedJson = mapper.writeValueAsString(expectedMergedEntity) + mergedEntityJson == expectedJson + + where: + nodeEntityMap || + expectedMergedEntity + [(createNodeIdentifier(1)): new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new LabelDTO(label: 'label')), + (createNodeIdentifier(2)): new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false)), + (createNodeIdentifier(3)): new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new LabelDTO(label: 'label'))] || + new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false)) + + } + + def createNodeIdentifier(int id) { + new NodeIdentifier("cluster-node-$id", 'addr', id, 'sktaddr', id * 10, 'stsaddr', id * 100, id * 1000, false, null) + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/572dfed7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/PermissionBasedStatusMergerSpec.groovy ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/PermissionBasedStatusMergerSpec.groovy b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/PermissionBasedStatusMergerSpec.groovy new file mode 100644 index 0000000..f922fff --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/PermissionBasedStatusMergerSpec.groovy @@ -0,0 +1,328 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager + +import org.apache.nifi.cluster.protocol.NodeIdentifier +import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO +import org.apache.nifi.web.api.dto.status.ConnectionStatusSnapshotDTO +import org.apache.nifi.web.api.dto.status.ControllerStatusDTO +import org.apache.nifi.web.api.dto.status.PortStatusDTO +import org.apache.nifi.web.api.dto.status.PortStatusSnapshotDTO +import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO +import org.apache.nifi.web.api.dto.status.ProcessGroupStatusSnapshotDTO +import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO +import org.apache.nifi.web.api.dto.status.ProcessorStatusSnapshotDTO +import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO +import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusSnapshotDTO +import org.codehaus.jackson.map.ObjectMapper +import org.codehaus.jackson.map.SerializationConfig +import org.codehaus.jackson.map.annotate.JsonSerialize +import org.codehaus.jackson.xc.JaxbAnnotationIntrospector +import spock.lang.Specification +import spock.lang.Unroll + +@Unroll +class PermissionBasedStatusMergerSpec extends Specification { + def "Merge ConnectionStatusDTO"() { + given: + def mapper = new ObjectMapper(); + def jaxbIntrospector = new JaxbAnnotationIntrospector(); + def SerializationConfig serializationConfig = mapper.getSerializationConfig(); + mapper.setSerializationConfig(serializationConfig.withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL).withAnnotationIntrospector(jaxbIntrospector)); + def merger = new StatusMerger() + + when: + merger.merge(target, targetCanRead, toMerge, toMergeCanRead, 'nodeid', 'nodeaddress', 1234) + + then: + def returnedJson = mapper.writeValueAsString(target) + def expectedJson = mapper.writeValueAsString(expectedDto) + returnedJson == expectedJson + + where: + target | targetCanRead | + toMerge | toMergeCanRead || + expectedDto + new ConnectionStatusDTO(groupId: 'real', id: 'real', name: 'real', sourceId: 'real', sourceName: 'real', destinationId: 'real', destinationName: 'real') | true | + new ConnectionStatusDTO(groupId: 'hidden', id: 'hidden', name: 'hidden', sourceId: 'hidden', sourceName: 'hidden', destinationId: 'hidden', + destinationName: 'hidden') | false || + new ConnectionStatusDTO(groupId: 'hidden', id: 'hidden', name: 'hidden', sourceId: 'hidden', sourceName: 'hidden', destinationId: 'hidden', + destinationName: 'hidden') + new ConnectionStatusDTO(groupId: 'hidden', id: 'hidden', name: 'hidden', sourceId: 'hidden', sourceName: 'hidden', destinationId: 'hidden', destinationName: 'hidden') | false | + new ConnectionStatusDTO(groupId: 'real', id: 'real', name: 'real', sourceId: 'real', sourceName: 'real', destinationId: 'real', destinationName: 'real') | true || + new ConnectionStatusDTO(groupId: 'hidden', id: 'hidden', name: 'hidden', sourceId: 'hidden', sourceName: 'hidden', destinationId: 'hidden', destinationName: 'hidden') + } + + def "Merge ConnectionStatusSnapshotDTO"() { + given: + def mapper = new ObjectMapper(); + def jaxbIntrospector = new JaxbAnnotationIntrospector(); + def SerializationConfig serializationConfig = mapper.getSerializationConfig(); + mapper.setSerializationConfig(serializationConfig.withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL).withAnnotationIntrospector(jaxbIntrospector)); + def merger = new StatusMerger() + + when: + merger.merge(target, targetCanRead, toMerge, toMergeCanRead) + + then: + def returnedJson = mapper.writeValueAsString(target) + def expectedJson = mapper.writeValueAsString(expectedDto) + returnedJson == expectedJson + + where: + target | targetCanRead | + toMerge | toMergeCanRead || + expectedDto + new ConnectionStatusSnapshotDTO(groupId: 'real', id: 'real', name: 'real', sourceId: 'real', sourceName: 'real', destinationId: 'real', destinationName: 'real') | true | + new ConnectionStatusSnapshotDTO(groupId: 'hidden', id: 'hidden', name: 'hidden', sourceId: 'hidden', sourceName: 'hidden', destinationId: 'hidden', + destinationName: 'hidden') | false || + new ConnectionStatusSnapshotDTO(groupId: 'hidden', id: 'hidden', name: 'hidden', sourceId: 'hidden', sourceName: 'hidden', destinationId: 'hidden', + destinationName: 'hidden', input: '0 (0 bytes)', output: '0 (0 bytes)', queued: '0 (0 bytes)', queuedSize: '0 bytes', queuedCount: '0') + new ConnectionStatusSnapshotDTO(groupId: 'hidden', id: 'hidden', name: 'hidden', sourceId: 'hidden', sourceName: 'hidden', destinationId: 'hidden', + destinationName: 'hidden') | false | + new ConnectionStatusSnapshotDTO(groupId: 'real', id: 'real', name: 'real', sourceId: 'real', sourceName: 'real', destinationId: 'real', destinationName: 'real') | true || + new ConnectionStatusSnapshotDTO(groupId: 'hidden', id: 'hidden', name: 'hidden', sourceId: 'hidden', sourceName: 'hidden', destinationId: 'hidden', + destinationName: 'hidden', input: '0 (0 bytes)', output: '0 (0 bytes)', queued: '0 (0 bytes)', queuedSize: '0 bytes', queuedCount: '0') + } + + def "Merge PortStatusDTO"() { + given: + def mapper = new ObjectMapper(); + def jaxbIntrospector = new JaxbAnnotationIntrospector(); + def SerializationConfig serializationConfig = mapper.getSerializationConfig(); + mapper.setSerializationConfig(serializationConfig.withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL).withAnnotationIntrospector(jaxbIntrospector)); + def merger = new StatusMerger() + + when: + merger.merge(target, targetCanRead, toMerge, toMergeCanRead, 'nodeid', 'nodeaddress', 1234) + + then: + def returnedJson = mapper.writeValueAsString(target) + def expectedJson = mapper.writeValueAsString(expectedDto) + returnedJson == expectedJson + + where: + target | targetCanRead | + toMerge | toMergeCanRead || + expectedDto + new PortStatusDTO(groupId: 'real', id: 'real', name: 'real') | true | + new PortStatusDTO(groupId: 'hidden', id: 'hidden', name: 'hidden') | false || + new PortStatusDTO(groupId: 'hidden', id: 'hidden', name: 'hidden') + new PortStatusDTO(groupId: 'hidden', id: 'hidden', name: 'hidden') | false | + new PortStatusDTO(groupId: 'real', id: 'real', name: 'real') | true || + new PortStatusDTO(groupId: 'hidden', id: 'hidden', name: 'hidden') + } + + def "Merge PortStatusSnapshotDTO"() { + given: + def mapper = new ObjectMapper(); + def jaxbIntrospector = new JaxbAnnotationIntrospector(); + def SerializationConfig serializationConfig = mapper.getSerializationConfig(); + mapper.setSerializationConfig(serializationConfig.withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL).withAnnotationIntrospector(jaxbIntrospector)); + def merger = new StatusMerger() + + when: + merger.merge(target, targetCanRead, toMerge, toMergeCanRead) + + then: + def returnedJson = mapper.writeValueAsString(target) + def expectedJson = mapper.writeValueAsString(expectedDto) + returnedJson == expectedJson + + where: + target | targetCanRead | + toMerge | toMergeCanRead || + expectedDto + new PortStatusSnapshotDTO(groupId: 'real', id: 'real', name: 'real') | true | + new PortStatusSnapshotDTO(groupId: 'hidden', id: 'hidden', name: 'hidden') | false || + new PortStatusSnapshotDTO(groupId: 'hidden', id: 'hidden', name: 'hidden', input: '0 (0 bytes)', output: '0 (0 bytes)', transmitting: false) + new PortStatusSnapshotDTO(groupId: 'hidden', id: 'hidden', name: 'hidden') | false | + new PortStatusSnapshotDTO(groupId: 'real', id: 'real', name: 'real') | true || + new PortStatusSnapshotDTO(groupId: 'hidden', id: 'hidden', name: 'hidden', input: '0 (0 bytes)', output: '0 (0 bytes)', transmitting: false) + } + + def "Merge ProcessGroupStatusDTO"() { + given: + def mapper = new ObjectMapper(); + def jaxbIntrospector = new JaxbAnnotationIntrospector(); + def SerializationConfig serializationConfig = mapper.getSerializationConfig(); + mapper.setSerializationConfig(serializationConfig.withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL).withAnnotationIntrospector(jaxbIntrospector)); + def merger = new StatusMerger() + + when: + merger.merge(target, targetCanRead, toMerge, toMergeCanRead, 'nodeid', 'nodeaddress', 1234) + + then: + def returnedJson = mapper.writeValueAsString(target) + def expectedJson = mapper.writeValueAsString(expectedDto) + returnedJson == expectedJson + + where: + target | targetCanRead | + toMerge | toMergeCanRead || + expectedDto + new ProcessGroupStatusDTO(id: 'real', name: 'real') | true | new ProcessGroupStatusDTO(id: 'hidden', name: 'hidden') | false || + new ProcessGroupStatusDTO(id: 'hidden', name: 'hidden') + new ProcessGroupStatusDTO(id: 'hidden', name: 'hidden') | false | new ProcessGroupStatusDTO(id: 'real', name: 'real') | true || + new ProcessGroupStatusDTO(id: 'hidden', name: 'hidden') + } + + def "Merge ProcessGroupStatusSnapshotDTO"() { + given: + def mapper = new ObjectMapper(); + def jaxbIntrospector = new JaxbAnnotationIntrospector(); + def SerializationConfig serializationConfig = mapper.getSerializationConfig(); + mapper.setSerializationConfig(serializationConfig.withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL).withAnnotationIntrospector(jaxbIntrospector)); + def merger = new StatusMerger() + + when: + merger.merge(target, targetCanRead, toMerge, toMergeCanRead) + + then: + def returnedJson = mapper.writeValueAsString(target) + def expectedJson = mapper.writeValueAsString(expectedDto) + returnedJson == expectedJson + + where: + target | targetCanRead | + toMerge | toMergeCanRead || + expectedDto + new ProcessGroupStatusSnapshotDTO(id: 'real', name: 'real') | true | new ProcessGroupStatusSnapshotDTO(id: 'hidden', name: 'hidden') | false || + new ProcessGroupStatusSnapshotDTO(id: 'hidden', name: 'hidden', input: '0 (0 bytes)', output: '0 (0 bytes)', transferred: '0 (0 bytes)', read: '0 bytes', written: '0' + + ' bytes', + queued: '0 (0 bytes)', queuedSize: '0 bytes', queuedCount: '0', received: '0 (0 bytes)', sent: '0 (0 bytes)', connectionStatusSnapshots: [], inputPortStatusSnapshots: [], + outputPortStatusSnapshots: [], processorStatusSnapshots: [], remoteProcessGroupStatusSnapshots: []) + new ProcessGroupStatusSnapshotDTO(id: 'hidden', name: 'hidden') | false | new ProcessGroupStatusSnapshotDTO(id: 'real', name: 'real') | true || + new ProcessGroupStatusSnapshotDTO(id: 'hidden', name: 'hidden', input: '0 (0 bytes)', output: '0 (0 bytes)', transferred: '0 (0 bytes)', read: '0 bytes', written: '0 bytes', + queued: '0 (0 bytes)', queuedSize: '0 bytes', queuedCount: '0', received: '0 (0 bytes)', sent: '0 (0 bytes)', connectionStatusSnapshots: [], inputPortStatusSnapshots: [], + outputPortStatusSnapshots: [], processorStatusSnapshots: [], remoteProcessGroupStatusSnapshots: []) + } + + def "Merge ProcessorStatusDTO"() { + given: + def mapper = new ObjectMapper(); + def jaxbIntrospector = new JaxbAnnotationIntrospector(); + def SerializationConfig serializationConfig = mapper.getSerializationConfig(); + mapper.setSerializationConfig(serializationConfig.withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL).withAnnotationIntrospector(jaxbIntrospector)); + def merger = new StatusMerger() + + when: + merger.merge(target, targetCanRead, toMerge, toMergeCanRead, 'nodeid', 'nodeaddress', 1234) + + then: + def returnedJson = mapper.writeValueAsString(target) + def expectedJson = mapper.writeValueAsString(expectedDto) + returnedJson == expectedJson + + where: + target | targetCanRead | + toMerge | toMergeCanRead || + expectedDto + new ProcessorStatusDTO(groupId: 'real', id: 'real', name: 'real', type: 'real') | true | + new ProcessorStatusDTO(groupId: 'hidden', id: 'hidden', name: 'hidden', type: 'hidden') | false || + new ProcessorStatusDTO(groupId: 'hidden', id: 'hidden', name: 'hidden', type: 'hidden') + new ProcessorStatusDTO(groupId: 'hidden', id: 'hidden', name: 'hidden', type: 'hidden') | false | + new ProcessorStatusDTO(groupId: 'real', id: 'real', name: 'real', type: 'real') | true || + new ProcessorStatusDTO(groupId: 'hidden', id: 'hidden', name: 'hidden', type: 'hidden') + } + + def "Merge ProcessorStatusSnapshotDTO"() { + given: + def mapper = new ObjectMapper(); + def jaxbIntrospector = new JaxbAnnotationIntrospector(); + def SerializationConfig serializationConfig = mapper.getSerializationConfig(); + mapper.setSerializationConfig(serializationConfig.withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL).withAnnotationIntrospector(jaxbIntrospector)); + def merger = new StatusMerger() + + when: + merger.merge(target, targetCanRead, toMerge, toMergeCanRead) + + then: + def returnedJson = mapper.writeValueAsString(target) + def expectedJson = mapper.writeValueAsString(expectedDto) + returnedJson == expectedJson + + where: + target | targetCanRead | + toMerge | toMergeCanRead || + expectedDto + new ProcessorStatusSnapshotDTO(groupId: 'hidden', id: 'hidden', name: 'hidden', type: 'hidden') | false | + new ProcessorStatusSnapshotDTO(groupId: 'real', id: 'real', name: 'real', type: 'real') | true || + new ProcessorStatusSnapshotDTO(groupId: 'hidden', id: 'hidden', name: 'hidden', type: 'hidden', input: '0 (0 bytes)', output: '0 (0 bytes)', read: '0 bytes', + written: '0 bytes', tasks: '0', tasksDuration: '00:00:00.000') + new ProcessorStatusSnapshotDTO(groupId: 'real', id: 'real', name: 'real', type: 'real') | true | + new ProcessorStatusSnapshotDTO(groupId: 'hidden', id: 'hidden', name: 'hidden', type: 'hidden') | false || + new ProcessorStatusSnapshotDTO(groupId: 'hidden', id: 'hidden', name: 'hidden', type: 'hidden', input: '0 (0 bytes)', output: '0 (0 bytes)', read: '0 bytes', + written: '0 bytes', tasks: '0', tasksDuration: '00:00:00.000') + } + + def "Merge RemoteProcessGroupStatusDTO"() { + given: + def mapper = new ObjectMapper(); + def jaxbIntrospector = new JaxbAnnotationIntrospector(); + def SerializationConfig serializationConfig = mapper.getSerializationConfig(); + mapper.setSerializationConfig(serializationConfig.withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL).withAnnotationIntrospector(jaxbIntrospector)); + def merger = new StatusMerger() + + when: + merger.merge(target, targetCanRead, toMerge, toMergeCanRead, 'nodeid', 'nodeaddress', 1234) + + then: + def returnedJson = mapper.writeValueAsString(target) + def expectedJson = mapper.writeValueAsString(expectedDto) + returnedJson == expectedJson + + where: + target | targetCanRead | + toMerge | toMergeCanRead || + expectedDto + new RemoteProcessGroupStatusDTO(groupId: 'real', id: 'real', name: 'real', targetUri: 'real') | true | + new RemoteProcessGroupStatusDTO(groupId: 'hidden', id: 'hidden', name: 'hidden', targetUri: 'hidden') | false || + new RemoteProcessGroupStatusDTO(groupId: 'hidden', id: 'hidden', name: 'hidden', targetUri: 'hidden') + new RemoteProcessGroupStatusDTO(groupId: 'hidden', id: 'hidden', name: 'hidden', targetUri: 'hidden') | false | + new RemoteProcessGroupStatusDTO(groupId: 'real', id: 'real', name: 'real', targetUri: 'real') | true || + new RemoteProcessGroupStatusDTO(groupId: 'hidden', id: 'hidden', name: 'hidden', targetUri: 'hidden') + } + + def "Merge RemoteProcessGroupStatusSnapshotDTO"() { + given: + def mapper = new ObjectMapper(); + def jaxbIntrospector = new JaxbAnnotationIntrospector(); + def SerializationConfig serializationConfig = mapper.getSerializationConfig(); + mapper.setSerializationConfig(serializationConfig.withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL).withAnnotationIntrospector(jaxbIntrospector)); + def merger = new StatusMerger() + + when: + merger.merge(target, targetCanRead, toMerge, toMergeCanRead) + + then: + def returnedJson = mapper.writeValueAsString(target) + def expectedJson = mapper.writeValueAsString(expectedDto) + returnedJson == expectedJson + + where: + target | targetCanRead | + toMerge | toMergeCanRead || + expectedDto + new RemoteProcessGroupStatusSnapshotDTO(groupId: 'real', id: 'real', name: 'real', targetUri: 'real') | true | + new RemoteProcessGroupStatusSnapshotDTO(groupId: 'hidden', id: 'hidden', name: 'hidden', targetUri: 'hidden') | false || + new RemoteProcessGroupStatusSnapshotDTO(groupId: 'hidden', id: 'hidden', name: 'hidden', targetUri: 'hidden', received: '0 (0 bytes)', sent: '0 (0 bytes)') + new RemoteProcessGroupStatusSnapshotDTO(groupId: 'hidden', id: 'hidden', name: 'hidden', targetUri: 'hidden') | false | + new RemoteProcessGroupStatusSnapshotDTO(groupId: 'real', id: 'real', name: 'real', targetUri: 'real') | true || + new RemoteProcessGroupStatusSnapshotDTO(groupId: 'hidden', id: 'hidden', name: 'hidden', targetUri: 'hidden', received: '0 (0 bytes)', sent: '0 (0 bytes)') + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/572dfed7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/RemoteProcessGroupStatusDescriptor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/RemoteProcessGroupStatusDescriptor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/RemoteProcessGroupStatusDescriptor.java index a114a00..5875249 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/RemoteProcessGroupStatusDescriptor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/RemoteProcessGroupStatusDescriptor.java @@ -17,12 +17,12 @@ package org.apache.nifi.controller.status.history; -import java.util.List; -import java.util.concurrent.TimeUnit; - import org.apache.nifi.controller.status.RemoteProcessGroupStatus; import org.apache.nifi.controller.status.history.MetricDescriptor.Formatter; +import java.util.List; +import java.util.concurrent.TimeUnit; + public enum RemoteProcessGroupStatusDescriptor { SENT_BYTES(new StandardMetricDescriptor<RemoteProcessGroupStatus>("sentBytes", "Bytes Sent (5 mins)",