http://git-wip-us.apache.org/repos/asf/nifi/blob/0b437e09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessGroupEndpointMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessGroupEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessGroupEndpointMerger.java index bef75a0..6b9b080 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessGroupEndpointMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessGroupEndpointMerger.java @@ -17,22 +17,18 @@ package org.apache.nifi.cluster.coordination.http.endpoints; -import java.net.URI; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; -import java.util.regex.Pattern; - import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger; import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.manager.ProcessGroupEntityMerger; import org.apache.nifi.cluster.protocol.NodeIdentifier; -import org.apache.nifi.web.api.dto.FlowSnippetDTO; -import org.apache.nifi.web.api.dto.ProcessGroupDTO; -import org.apache.nifi.web.api.dto.ProcessorDTO; -import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; import org.apache.nifi.web.api.entity.ProcessGroupEntity; -public class ProcessGroupEndpointMerger implements EndpointResponseMerger { +import java.net.URI; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; + +public class ProcessGroupEndpointMerger extends AbstractSingleEntityEndpoint<ProcessGroupEntity> implements EndpointResponseMerger { public static final Pattern PROCESS_GROUP_URI_PATTERN = Pattern.compile("/nifi-api/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))"); @Override @@ -41,67 +37,12 @@ public class ProcessGroupEndpointMerger implements EndpointResponseMerger { } @Override - public NodeResponse merge(final URI uri, final String method, final Set<NodeResponse> successfulResponses, final Set<NodeResponse> problematicResponses, final NodeResponse clientResponse) { - if (!canHandle(uri, method)) { - throw new IllegalArgumentException("Cannot use Endpoint Mapper of type " + getClass().getSimpleName() + " to map responses for URI " + uri + ", HTTP Method " + method); - } - - final ProcessGroupEntity responseEntity = clientResponse.getClientResponse().getEntity(ProcessGroupEntity.class); - final ProcessGroupDTO responseDto = responseEntity.getComponent(); - - final FlowSnippetDTO contents = responseDto.getContents(); - if (contents == null) { - return new NodeResponse(clientResponse, responseEntity); - } else { - final Map<String, Map<NodeIdentifier, ProcessorDTO>> processorMap = new HashMap<>(); - final Map<String, Map<NodeIdentifier, RemoteProcessGroupDTO>> remoteProcessGroupMap = new HashMap<>(); - - for (final NodeResponse nodeResponse : successfulResponses) { - final ProcessGroupEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ProcessGroupEntity.class); - final ProcessGroupDTO nodeProcessGroup = nodeResponseEntity.getComponent(); - - for (final ProcessorDTO nodeProcessor : nodeProcessGroup.getContents().getProcessors()) { - Map<NodeIdentifier, ProcessorDTO> innerMap = processorMap.get(nodeProcessor.getId()); - if (innerMap == null) { - innerMap = new HashMap<>(); - processorMap.put(nodeProcessor.getId(), innerMap); - } - - innerMap.put(nodeResponse.getNodeId(), nodeProcessor); - } - - for (final RemoteProcessGroupDTO nodeRemoteProcessGroup : nodeProcessGroup.getContents().getRemoteProcessGroups()) { - Map<NodeIdentifier, RemoteProcessGroupDTO> innerMap = remoteProcessGroupMap.get(nodeRemoteProcessGroup.getId()); - if (innerMap == null) { - innerMap = new HashMap<>(); - remoteProcessGroupMap.put(nodeRemoteProcessGroup.getId(), innerMap); - } - - innerMap.put(nodeResponse.getNodeId(), nodeRemoteProcessGroup); - } - } - - final ProcessorEndpointMerger procMerger = new ProcessorEndpointMerger(); - for (final ProcessorDTO processor : contents.getProcessors()) { - final String procId = processor.getId(); - final Map<NodeIdentifier, ProcessorDTO> mergeMap = processorMap.get(procId); - - procMerger.mergeResponses(processor, mergeMap, successfulResponses, problematicResponses); - } - - final RemoteProcessGroupEndpointMerger rpgMerger = new RemoteProcessGroupEndpointMerger(); - for (final RemoteProcessGroupDTO remoteProcessGroup : contents.getRemoteProcessGroups()) { - if (remoteProcessGroup.getContents() != null) { - final String remoteProcessGroupId = remoteProcessGroup.getId(); - final Map<NodeIdentifier, RemoteProcessGroupDTO> mergeMap = remoteProcessGroupMap.get(remoteProcessGroupId); - - rpgMerger.mergeResponses(remoteProcessGroup, mergeMap, successfulResponses, problematicResponses); - } - } - } - - // create a new client response - return new NodeResponse(clientResponse, responseEntity); + protected Class<ProcessGroupEntity> getEntityClass() { + return ProcessGroupEntity.class; } + @Override + protected void mergeResponses(ProcessGroupEntity clientEntity, Map<NodeIdentifier, ProcessGroupEntity> entityMap, Set<NodeResponse> successfulResponses, Set<NodeResponse> problematicResponses) { + ProcessGroupEntityMerger.mergeProcessGroups(clientEntity, entityMap); + } }
http://git-wip-us.apache.org/repos/asf/nifi/blob/0b437e09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessGroupsEndpointMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessGroupsEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessGroupsEndpointMerger.java new file mode 100644 index 0000000..d4f047e --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessGroupsEndpointMerger.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http.endpoints; + +import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger; +import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.manager.ProcessGroupsEntityMerger; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.entity.ProcessGroupEntity; +import org.apache.nifi.web.api.entity.ProcessGroupsEntity; + +import java.net.URI; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; + +public class ProcessGroupsEndpointMerger implements EndpointResponseMerger { + public static final Pattern PROCESS_GROUPS_URI_PATTERN = Pattern.compile("/nifi-api/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/process-groups"); + + @Override + public boolean canHandle(final URI uri, final String method) { + return "GET".equalsIgnoreCase(method) && PROCESS_GROUPS_URI_PATTERN.matcher(uri.getPath()).matches(); + } + + @Override + public final NodeResponse merge(final URI uri, final String method, final Set<NodeResponse> successfulResponses, final Set<NodeResponse> problematicResponses, final NodeResponse clientResponse) { + if (!canHandle(uri, method)) { + throw new IllegalArgumentException("Cannot use Endpoint Mapper of type " + getClass().getSimpleName() + " to map responses for URI " + uri + ", HTTP Method " + method); + } + + final ProcessGroupsEntity responseEntity = clientResponse.getClientResponse().getEntity(ProcessGroupsEntity.class); + final Set<ProcessGroupEntity> processGroupEntities = responseEntity.getProcessGroups(); + + final Map<String, Map<NodeIdentifier, ProcessGroupEntity>> entityMap = new HashMap<>(); + for (final NodeResponse nodeResponse : successfulResponses) { + final ProcessGroupsEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ProcessGroupsEntity.class); + final Set<ProcessGroupEntity> nodeProcessGroupEntities = nodeResponseEntity.getProcessGroups(); + + for (final ProcessGroupEntity nodeProcessGroupEntity : nodeProcessGroupEntities) { + final NodeIdentifier nodeId = nodeResponse.getNodeId(); + Map<NodeIdentifier, ProcessGroupEntity> innerMap = entityMap.get(nodeId); + if (innerMap == null) { + innerMap = new HashMap<>(); + entityMap.put(nodeProcessGroupEntity.getId(), innerMap); + } + + innerMap.put(nodeResponse.getNodeId(), nodeProcessGroupEntity); + } + } + + ProcessGroupsEntityMerger.mergeProcessGroups(processGroupEntities, entityMap); + + // create a new client response + return new NodeResponse(clientResponse, responseEntity); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0b437e09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorEndpointMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorEndpointMerger.java index bd50eca..b8202c8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorEndpointMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorEndpointMerger.java @@ -17,28 +17,24 @@ package org.apache.nifi.cluster.coordination.http.endpoints; -import java.net.URI; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; -import java.util.regex.Pattern; - import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger; import org.apache.nifi.cluster.manager.NodeResponse; -import org.apache.nifi.cluster.manager.StatusMerger; +import org.apache.nifi.cluster.manager.ProcessorEntityMerger; import org.apache.nifi.cluster.protocol.NodeIdentifier; -import org.apache.nifi.web.api.dto.ProcessorDTO; import org.apache.nifi.web.api.entity.ProcessorEntity; +import java.net.URI; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; + public class ProcessorEndpointMerger extends AbstractSingleEntityEndpoint<ProcessorEntity> implements EndpointResponseMerger { - public static final Pattern PROCESSORS_URI_PATTERN = Pattern.compile("/nifi-api/processors"); + public static final Pattern PROCESSORS_URI_PATTERN = Pattern.compile("/nifi-api/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors"); public static final Pattern PROCESSOR_URI_PATTERN = Pattern.compile("/nifi-api/processors/[a-f0-9\\-]{36}"); - public static final Pattern CLUSTER_PROCESSOR_URI_PATTERN = Pattern.compile("/nifi-api/cluster/processors/[a-f0-9\\-]{36}"); @Override public boolean canHandle(final URI uri, final String method) { - if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) - && (PROCESSOR_URI_PATTERN.matcher(uri.getPath()).matches() || CLUSTER_PROCESSOR_URI_PATTERN.matcher(uri.getPath()).matches())) { + if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && (PROCESSOR_URI_PATTERN.matcher(uri.getPath()).matches())) { return true; } else if ("POST".equalsIgnoreCase(method) && PROCESSORS_URI_PATTERN.matcher(uri.getPath()).matches()) { return true; @@ -53,42 +49,10 @@ public class ProcessorEndpointMerger extends AbstractSingleEntityEndpoint<Proces } - protected void mergeResponses(final ProcessorDTO clientDto, final Map<NodeIdentifier, ProcessorDTO> dtoMap, final Set<NodeResponse> successfulResponses, - final Set<NodeResponse> problematicResponses) { - final Map<String, Set<NodeIdentifier>> validationErrorMap = new HashMap<>(); - - for (final Map.Entry<NodeIdentifier, ProcessorDTO> nodeEntry : dtoMap.entrySet()) { - final NodeIdentifier nodeId = nodeEntry.getKey(); - final ProcessorDTO nodeProcessor = nodeEntry.getValue(); - - // merge the validation errors - mergeValidationErrors(validationErrorMap, nodeId, nodeProcessor.getValidationErrors()); - } - - // set the merged the validation errors - clientDto.setValidationErrors(normalizedMergedValidationErrors(validationErrorMap, dtoMap.size())); - } - @Override protected void mergeResponses(final ProcessorEntity clientEntity, final Map<NodeIdentifier, ProcessorEntity> entityMap, final Set<NodeResponse> successfulResponses, final Set<NodeResponse> problematicResponses) { - final ProcessorDTO clientDto = clientEntity.getComponent(); - final Map<NodeIdentifier, ProcessorDTO> dtoMap = new HashMap<>(); - for (final Map.Entry<NodeIdentifier, ProcessorEntity> entry : entityMap.entrySet()) { - final ProcessorEntity nodeProcEntity = entry.getValue(); - final ProcessorDTO nodeProcDto = nodeProcEntity.getComponent(); - dtoMap.put(entry.getKey(), nodeProcDto); - } - - for (final Map.Entry<NodeIdentifier, ProcessorEntity> entry : entityMap.entrySet()) { - final NodeIdentifier nodeId = entry.getKey(); - final ProcessorEntity entity = entry.getValue(); - if (entity != clientEntity) { - StatusMerger.merge(clientEntity.getStatus(), entity.getStatus(), nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort()); - } - } - - mergeResponses(clientDto, dtoMap, successfulResponses, problematicResponses); + ProcessorEntityMerger.mergeProcessors(clientEntity, entityMap); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/0b437e09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorsEndpointMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorsEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorsEndpointMerger.java index fa076b9..da84c58 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorsEndpointMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorsEndpointMerger.java @@ -17,21 +17,21 @@ package org.apache.nifi.cluster.coordination.http.endpoints; -import java.net.URI; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; -import java.util.regex.Pattern; - import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger; import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.manager.ProcessorsEntityMerger; import org.apache.nifi.cluster.protocol.NodeIdentifier; -import org.apache.nifi.web.api.dto.ProcessorDTO; import org.apache.nifi.web.api.entity.ProcessorEntity; import org.apache.nifi.web.api.entity.ProcessorsEntity; +import java.net.URI; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; + public class ProcessorsEndpointMerger implements EndpointResponseMerger { - public static final Pattern PROCESSORS_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors"); + public static final Pattern PROCESSORS_URI_PATTERN = Pattern.compile("/nifi-api/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors"); @Override public boolean canHandle(final URI uri, final String method) { @@ -47,30 +47,24 @@ public class ProcessorsEndpointMerger implements EndpointResponseMerger { final ProcessorsEntity responseEntity = clientResponse.getClientResponse().getEntity(ProcessorsEntity.class); final Set<ProcessorEntity> processorEntities = responseEntity.getProcessors(); - final Map<String, Map<NodeIdentifier, ProcessorDTO>> dtoMap = new HashMap<>(); + final Map<String, Map<NodeIdentifier, ProcessorEntity>> entityMap = new HashMap<>(); for (final NodeResponse nodeResponse : successfulResponses) { final ProcessorsEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ProcessorsEntity.class); final Set<ProcessorEntity> nodeProcessorEntities = nodeResponseEntity.getProcessors(); for (final ProcessorEntity nodeProcessorEntity : nodeProcessorEntities) { final NodeIdentifier nodeId = nodeResponse.getNodeId(); - Map<NodeIdentifier, ProcessorDTO> innerMap = dtoMap.get(nodeId); + Map<NodeIdentifier, ProcessorEntity> innerMap = entityMap.get(nodeId); if (innerMap == null) { innerMap = new HashMap<>(); - dtoMap.put(nodeProcessorEntity.getId(), innerMap); + entityMap.put(nodeProcessorEntity.getId(), innerMap); } - innerMap.put(nodeResponse.getNodeId(), nodeProcessorEntity.getComponent()); + innerMap.put(nodeResponse.getNodeId(), nodeProcessorEntity); } } - final ProcessorEndpointMerger procMerger = new ProcessorEndpointMerger(); - for (final ProcessorEntity entity : processorEntities) { - final String componentId = entity.getId(); - final Map<NodeIdentifier, ProcessorDTO> mergeMap = dtoMap.get(componentId); - - procMerger.mergeResponses(entity.getComponent(), mergeMap, successfulResponses, problematicResponses); - } + ProcessorsEntityMerger.mergeProcessors(processorEntities, entityMap); // create a new client response return new NodeResponse(clientResponse, responseEntity); http://git-wip-us.apache.org/repos/asf/nifi/blob/0b437e09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupEndpointMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupEndpointMerger.java index 732d527..e130e5e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupEndpointMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupEndpointMerger.java @@ -17,24 +17,19 @@ package org.apache.nifi.cluster.coordination.http.endpoints; +import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger; +import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.manager.RemoteProcessGroupEntityMerger; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity; + import java.net.URI; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.regex.Pattern; -import org.apache.nifi.cluster.manager.NodeResponse; -import org.apache.nifi.cluster.protocol.NodeIdentifier; -import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO; -import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; -import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO; -import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity; - -public class RemoteProcessGroupEndpointMerger extends AbstractSingleDTOEndpoint<RemoteProcessGroupEntity, RemoteProcessGroupDTO> { - public static final Pattern REMOTE_PROCESS_GROUPS_URI_PATTERN = Pattern.compile("/nifi-api/remote-process-groups"); +public class RemoteProcessGroupEndpointMerger extends AbstractSingleEntityEndpoint<RemoteProcessGroupEntity> implements EndpointResponseMerger { + public static final Pattern REMOTE_PROCESS_GROUPS_URI_PATTERN = Pattern.compile("/nifi-api/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/remote-process-groups"); public static final Pattern REMOTE_PROCESS_GROUP_URI_PATTERN = Pattern.compile("/nifi-api/remote-process-groups/[a-f0-9\\-]{36}"); @Override @@ -54,76 +49,9 @@ public class RemoteProcessGroupEndpointMerger extends AbstractSingleDTOEndpoint< } @Override - protected RemoteProcessGroupDTO getDto(final RemoteProcessGroupEntity entity) { - return entity.getComponent(); - } - - @Override - protected void mergeResponses(RemoteProcessGroupDTO clientDto, Map<NodeIdentifier, RemoteProcessGroupDTO> dtoMap, Set<NodeResponse> successfulResponses, Set<NodeResponse> problematicResponses) { - final RemoteProcessGroupContentsDTO remoteProcessGroupContents = clientDto.getContents(); - - Boolean mergedIsTargetSecure = null; - final List<String> mergedAuthorizationIssues = new ArrayList<>(); - final Set<RemoteProcessGroupPortDTO> mergedInputPorts = new HashSet<>(); - final Set<RemoteProcessGroupPortDTO> mergedOutputPorts = new HashSet<>(); - - for (final Map.Entry<NodeIdentifier, RemoteProcessGroupDTO> nodeEntry : dtoMap.entrySet()) { - final NodeIdentifier nodeId = nodeEntry.getKey(); - final RemoteProcessGroupDTO nodeRemoteProcessGroupDto = nodeEntry.getValue(); - - // merge the issues - final List<String> nodeAuthorizationIssues = nodeRemoteProcessGroupDto.getAuthorizationIssues(); - if (nodeAuthorizationIssues != null && !nodeAuthorizationIssues.isEmpty()) { - for (final String nodeAuthorizationIssue : nodeAuthorizationIssues) { - mergedAuthorizationIssues.add(nodeId.getApiAddress() + ":" + nodeId.getApiPort() + " -- " + nodeAuthorizationIssue); - } - } - - // use the first target secure flag since they will all be the same - final Boolean nodeIsTargetSecure = nodeRemoteProcessGroupDto.isTargetSecure(); - if (mergedIsTargetSecure == null) { - mergedIsTargetSecure = nodeIsTargetSecure; - } - - // merge the ports in the contents - final RemoteProcessGroupContentsDTO nodeRemoteProcessGroupContentsDto = nodeRemoteProcessGroupDto.getContents(); - if (remoteProcessGroupContents != null && nodeRemoteProcessGroupContentsDto != null) { - if (nodeRemoteProcessGroupContentsDto.getInputPorts() != null) { - mergedInputPorts.addAll(nodeRemoteProcessGroupContentsDto.getInputPorts()); - } - if (nodeRemoteProcessGroupContentsDto.getOutputPorts() != null) { - mergedOutputPorts.addAll(nodeRemoteProcessGroupContentsDto.getOutputPorts()); - } - } - } - - if (remoteProcessGroupContents != null) { - if (!mergedInputPorts.isEmpty()) { - remoteProcessGroupContents.setInputPorts(mergedInputPorts); - } - if (!mergedOutputPorts.isEmpty()) { - remoteProcessGroupContents.setOutputPorts(mergedOutputPorts); - } - } - - if (mergedIsTargetSecure != null) { - clientDto.setTargetSecure(mergedIsTargetSecure); - } - - if (!mergedAuthorizationIssues.isEmpty()) { - clientDto.setAuthorizationIssues(mergedAuthorizationIssues); - } - } - protected void mergeResponses(RemoteProcessGroupEntity clientEntity, Map<NodeIdentifier, RemoteProcessGroupEntity> entityMap, - Set<NodeResponse> successfulResponses, Set<NodeResponse> problematicResponses) { - - final RemoteProcessGroupDTO clientDto = clientEntity.getComponent(); - final Map<NodeIdentifier, RemoteProcessGroupDTO> dtoMap = new HashMap<>(); - for (final Map.Entry<NodeIdentifier, RemoteProcessGroupEntity> entry : entityMap.entrySet()) { - dtoMap.put(entry.getKey(), entry.getValue().getComponent()); - } + Set<NodeResponse> successfulResponses, Set<NodeResponse> problematicResponses) { - mergeResponses(clientDto, dtoMap, successfulResponses, problematicResponses); + RemoteProcessGroupEntityMerger.mergeRemoteProcessGroups(clientEntity, entityMap); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/0b437e09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupsEndpointMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupsEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupsEndpointMerger.java index c387951..c95aa9b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupsEndpointMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupsEndpointMerger.java @@ -17,21 +17,21 @@ package org.apache.nifi.cluster.coordination.http.endpoints; -import java.net.URI; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; -import java.util.regex.Pattern; - import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger; import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.manager.RemoteProcessGroupsEntityMerger; import org.apache.nifi.cluster.protocol.NodeIdentifier; -import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity; import org.apache.nifi.web.api.entity.RemoteProcessGroupsEntity; +import java.net.URI; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; + public class RemoteProcessGroupsEndpointMerger implements EndpointResponseMerger { - public static final Pattern REMOTE_PROCESS_GROUPS_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/remote-process-groups"); + public static final Pattern REMOTE_PROCESS_GROUPS_URI_PATTERN = Pattern.compile("/nifi-api/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/remote-process-groups"); @Override public boolean canHandle(final URI uri, final String method) { @@ -47,30 +47,24 @@ public class RemoteProcessGroupsEndpointMerger implements EndpointResponseMerger final RemoteProcessGroupsEntity responseEntity = clientResponse.getClientResponse().getEntity(RemoteProcessGroupsEntity.class); final Set<RemoteProcessGroupEntity> rpgEntities = responseEntity.getRemoteProcessGroups(); - final Map<String, Map<NodeIdentifier, RemoteProcessGroupDTO>> dtoMap = new HashMap<>(); + final Map<String, Map<NodeIdentifier, RemoteProcessGroupEntity>> entityMap = new HashMap<>(); for (final NodeResponse nodeResponse : successfulResponses) { final RemoteProcessGroupsEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(RemoteProcessGroupsEntity.class); final Set<RemoteProcessGroupEntity> nodeRpgEntities = nodeResponseEntity.getRemoteProcessGroups(); for (final RemoteProcessGroupEntity nodeRpgEntity : nodeRpgEntities) { final NodeIdentifier nodeId = nodeResponse.getNodeId(); - Map<NodeIdentifier, RemoteProcessGroupDTO> innerMap = dtoMap.get(nodeId); + Map<NodeIdentifier, RemoteProcessGroupEntity> innerMap = entityMap.get(nodeId); if (innerMap == null) { innerMap = new HashMap<>(); - dtoMap.put(nodeRpgEntity.getId(), innerMap); + entityMap.put(nodeRpgEntity.getId(), innerMap); } - innerMap.put(nodeResponse.getNodeId(), nodeRpgEntity.getComponent()); + innerMap.put(nodeResponse.getNodeId(), nodeRpgEntity); } } - final RemoteProcessGroupEndpointMerger rpgMerger = new RemoteProcessGroupEndpointMerger(); - for (final RemoteProcessGroupEntity entity : rpgEntities) { - final String componentId = entity.getId(); - final Map<NodeIdentifier, RemoteProcessGroupDTO> mergeMap = dtoMap.get(componentId); - - rpgMerger.mergeResponses(entity.getComponent(), mergeMap, successfulResponses, problematicResponses); - } + RemoteProcessGroupsEntityMerger.mergeRemoteProcessGroups(rpgEntities, entityMap); // create a new client response return new NodeResponse(clientResponse, responseEntity); http://git-wip-us.apache.org/repos/asf/nifi/blob/0b437e09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ReportingTaskEndpointMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ReportingTaskEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ReportingTaskEndpointMerger.java index 245d3bd..b3a7ccc 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ReportingTaskEndpointMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ReportingTaskEndpointMerger.java @@ -17,20 +17,20 @@ package org.apache.nifi.cluster.coordination.http.endpoints; +import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger; import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.manager.ReportingTaskEntityMerger; import org.apache.nifi.cluster.protocol.NodeIdentifier; -import org.apache.nifi.web.api.dto.ReportingTaskDTO; import org.apache.nifi.web.api.entity.ReportingTaskEntity; import java.net.URI; -import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.regex.Pattern; -public class ReportingTaskEndpointMerger extends AbstractSingleDTOEndpoint<ReportingTaskEntity, ReportingTaskDTO> { - public static final String REPORTING_TASKS_URI = "/nifi-api/controller/reporting-tasks/node"; - public static final Pattern REPORTING_TASK_URI_PATTERN = Pattern.compile("/nifi-api/reporting-tasks/node/[a-f0-9\\-]{36}"); +public class ReportingTaskEndpointMerger extends AbstractSingleEntityEndpoint<ReportingTaskEntity> implements EndpointResponseMerger { + public static final String REPORTING_TASKS_URI = "/nifi-api/controller/reporting-tasks"; + public static final Pattern REPORTING_TASK_URI_PATTERN = Pattern.compile("/nifi-api/reporting-tasks/[a-f0-9\\-]{36}"); @Override public boolean canHandle(URI uri, String method) { @@ -49,32 +49,7 @@ public class ReportingTaskEndpointMerger extends AbstractSingleDTOEndpoint<Repor } @Override - protected ReportingTaskDTO getDto(ReportingTaskEntity entity) { - return entity.getComponent(); + protected void mergeResponses(ReportingTaskEntity clientEntity, Map<NodeIdentifier, ReportingTaskEntity> entityMap, Set<NodeResponse> successfulResponses, Set<NodeResponse> problematicResponses) { + ReportingTaskEntityMerger.mergeReportingTasks(clientEntity, entityMap); } - - @Override - protected void mergeResponses(ReportingTaskDTO clientDto, Map<NodeIdentifier, ReportingTaskDTO> dtoMap, Set<NodeResponse> successfulResponses, Set<NodeResponse> problematicResponses) { - final Map<String, Set<NodeIdentifier>> validationErrorMap = new HashMap<>(); - - int activeThreadCount = 0; - for (final Map.Entry<NodeIdentifier, ReportingTaskDTO> nodeEntry : dtoMap.entrySet()) { - final NodeIdentifier nodeId = nodeEntry.getKey(); - final ReportingTaskDTO nodeReportingTask = nodeEntry.getValue(); - - if (nodeReportingTask.getActiveThreadCount() != null) { - activeThreadCount += nodeReportingTask.getActiveThreadCount(); - } - - // merge the validation errors - mergeValidationErrors(validationErrorMap, nodeId, nodeReportingTask.getValidationErrors()); - } - - // set the merged active thread counts - clientDto.setActiveThreadCount(activeThreadCount); - - // set the merged the validation errors - clientDto.setValidationErrors(normalizedMergedValidationErrors(validationErrorMap, dtoMap.size())); - } - } http://git-wip-us.apache.org/repos/asf/nifi/blob/0b437e09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ReportingTasksEndpointMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ReportingTasksEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ReportingTasksEndpointMerger.java index b82b24a..a5390e8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ReportingTasksEndpointMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ReportingTasksEndpointMerger.java @@ -17,18 +17,20 @@ package org.apache.nifi.cluster.coordination.http.endpoints; +import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger; import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.manager.ReportingTasksEntityMerger; import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.web.api.entity.ReportingTaskEntity; import org.apache.nifi.web.api.entity.ReportingTasksEntity; import java.net.URI; +import java.util.HashMap; import java.util.Map; import java.util.Set; -import java.util.stream.Collectors; -public class ReportingTasksEndpointMerger extends AbstractMultiEntityEndpoint<ReportingTasksEntity, ReportingTaskEntity> { - public static final String REPORTING_TASKS_URI = "/nifi-api/controller/reporting-tasks/node"; +public class ReportingTasksEndpointMerger implements EndpointResponseMerger { + public static final String REPORTING_TASKS_URI = "/nifi-api/controller/reporting-tasks"; @Override public boolean canHandle(URI uri, String method) { @@ -36,28 +38,34 @@ public class ReportingTasksEndpointMerger extends AbstractMultiEntityEndpoint<Re } @Override - protected Class<ReportingTasksEntity> getEntityClass() { - return ReportingTasksEntity.class; - } + public final NodeResponse merge(final URI uri, final String method, final Set<NodeResponse> successfulResponses, final Set<NodeResponse> problematicResponses, final NodeResponse clientResponse) { + if (!canHandle(uri, method)) { + throw new IllegalArgumentException("Cannot use Endpoint Mapper of type " + getClass().getSimpleName() + " to map responses for URI " + uri + ", HTTP Method " + method); + } - @Override - protected Set<ReportingTaskEntity> getDtos(ReportingTasksEntity entity) { - return entity.getReportingTasks(); - } + final ReportingTasksEntity responseEntity = clientResponse.getClientResponse().getEntity(ReportingTasksEntity.class); + final Set<ReportingTaskEntity> reportingTasksEntities = responseEntity.getReportingTasks(); - @Override - protected String getComponentId(ReportingTaskEntity entity) { - return entity.getComponent().getId(); - } + final Map<String, Map<NodeIdentifier, ReportingTaskEntity>> entityMap = new HashMap<>(); + for (final NodeResponse nodeResponse : successfulResponses) { + final ReportingTasksEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ReportingTasksEntity.class); + final Set<ReportingTaskEntity> nodeReportingTaskEntities = nodeResponseEntity.getReportingTasks(); - @Override - protected void mergeResponses(ReportingTaskEntity entity, Map<NodeIdentifier, ReportingTaskEntity> entityMap, - Set<NodeResponse> successfulResponses, Set<NodeResponse> problematicResponses) { - - new ReportingTaskEndpointMerger().mergeResponses( - entity.getComponent(), - entityMap.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().getComponent())), - successfulResponses, - problematicResponses); + for (final ReportingTaskEntity nodeReportingTaskEntity : nodeReportingTaskEntities) { + final NodeIdentifier nodeId = nodeResponse.getNodeId(); + Map<NodeIdentifier, ReportingTaskEntity> innerMap = entityMap.get(nodeId); + if (innerMap == null) { + innerMap = new HashMap<>(); + entityMap.put(nodeReportingTaskEntity.getId(), innerMap); + } + + innerMap.put(nodeResponse.getNodeId(), nodeReportingTaskEntity); + } + } + + ReportingTasksEntityMerger.mergeReportingTasks(reportingTasksEntities, entityMap); + + // create a new client response + return new NodeResponse(clientResponse, responseEntity); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/0b437e09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/BulletinMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/BulletinMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/BulletinMerger.java new file mode 100644 index 0000000..53bff3b --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/BulletinMerger.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager; + +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.dto.BulletinDTO; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public final class BulletinMerger { + + private BulletinMerger() {} + + /** + * Merges the validation errors. + * + * @param bulletins bulletins + */ + public static List<BulletinDTO> mergeBulletins(final Map<NodeIdentifier, List<BulletinDTO>> bulletins) { + final List<BulletinDTO> bulletinDtos = new ArrayList<>(); + + for (final Map.Entry<NodeIdentifier, List<BulletinDTO>> entry : bulletins.entrySet()) { + final NodeIdentifier nodeId = entry.getKey(); + final List<BulletinDTO> nodeBulletins = entry.getValue(); + final String nodeAddress = nodeId.getApiAddress() + ":" + nodeId.getApiPort(); + + for (final BulletinDTO bulletin : nodeBulletins) { + bulletin.setNodeAddress(nodeAddress); + bulletinDtos.add(bulletin); + } + } + + Collections.sort(bulletinDtos, (BulletinDTO o1, BulletinDTO o2) -> { + final int timeComparison = o1.getTimestamp().compareTo(o2.getTimestamp()); + if (timeComparison != 0) { + return timeComparison; + } + + return o1.getNodeAddress().compareTo(o2.getNodeAddress()); + }); + + return bulletinDtos; + } + + /** + * Normalizes the validation errors. + * + * @param validationErrorMap validation errors for each node + * @param totalNodes total number of nodes + * @return the normalized validation errors + */ + public static Set<String> normalizedMergedValidationErrors(final Map<String, Set<NodeIdentifier>> validationErrorMap, int totalNodes) { + final Set<String> normalizedValidationErrors = new HashSet<>(); + for (final Map.Entry<String, Set<NodeIdentifier>> validationEntry : validationErrorMap.entrySet()) { + final String msg = validationEntry.getKey(); + final Set<NodeIdentifier> nodeIds = validationEntry.getValue(); + + if (nodeIds.size() == totalNodes) { + normalizedValidationErrors.add(msg); + } else { + nodeIds.forEach(id -> normalizedValidationErrors.add(id.getApiAddress() + ":" + id.getApiPort() + " -- " + msg)); + } + } + return normalizedValidationErrors; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0b437e09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ComponentEntityMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ComponentEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ComponentEntityMerger.java new file mode 100644 index 0000000..566b1de --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ComponentEntityMerger.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager; + +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.dto.BulletinDTO; +import org.apache.nifi.web.api.entity.ComponentEntity; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class ComponentEntityMerger { + + /** + * Merges the ComponentEntity responses. + * + * @param clientEntity the entity being returned to the client + * @param entityMap all node responses + */ + public static void mergeComponents(final ComponentEntity clientEntity, final Map<NodeIdentifier, ? extends ComponentEntity> entityMap) { + final Map<NodeIdentifier, List<BulletinDTO>> bulletinDtos = new HashMap<>(); + for (final Map.Entry<NodeIdentifier, ? extends ComponentEntity> entry : entityMap.entrySet()) { + final NodeIdentifier nodeIdentifier = entry.getKey(); + final ComponentEntity entity = entry.getValue(); + + // consider the bulletins if present and authorized + if (entity.getBulletins() != null) { + entity.getBulletins().forEach(bulletin -> { + bulletinDtos.computeIfAbsent(nodeIdentifier, nodeId -> new ArrayList<>()).add(bulletin); + }); + } + } + clientEntity.setBulletins(BulletinMerger.mergeBulletins(bulletinDtos)); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0b437e09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectionEntityMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectionEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectionEntityMerger.java new file mode 100644 index 0000000..75dea4c --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectionEntityMerger.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager; + +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.entity.ConnectionEntity; + +import java.util.Map; + +public class ConnectionEntityMerger { + + /** + * Merges the ConnectionEntity responses. + * + * @param clientEntity the entity being returned to the client + * @param entityMap all node responses + */ + public static void mergeConnections(final ConnectionEntity clientEntity, final Map<NodeIdentifier, ConnectionEntity> entityMap) { + for (final Map.Entry<NodeIdentifier, ConnectionEntity> entry : entityMap.entrySet()) { + final NodeIdentifier nodeId = entry.getKey(); + final ConnectionEntity entity = entry.getValue(); + if (entity != clientEntity) { + StatusMerger.merge(clientEntity.getStatus(), entity.getStatus(), nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0b437e09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectionsEntityMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectionsEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectionsEntityMerger.java new file mode 100644 index 0000000..42a9350 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectionsEntityMerger.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager; + +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.entity.ConnectionEntity; + +import java.util.Map; +import java.util.Set; + +public class ConnectionsEntityMerger { + + /** + * Merges multiple ConnectionEntity responses. + * + * @param connectionEntities entities being returned to the client + * @param entityMap all node responses + */ + public static void mergeConnections(final Set<ConnectionEntity> connectionEntities, final Map<String, Map<NodeIdentifier, ConnectionEntity>> entityMap) { + for (final ConnectionEntity entity : connectionEntities) { + ConnectionEntityMerger.mergeConnections(entity, entityMap.get(entity.getId())); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0b437e09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ControllerServiceEntityMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ControllerServiceEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ControllerServiceEntityMerger.java new file mode 100644 index 0000000..e9e542e --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ControllerServiceEntityMerger.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager; + +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.controller.service.ControllerServiceState; +import org.apache.nifi.web.api.dto.ControllerServiceDTO; +import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO; +import org.apache.nifi.web.api.entity.ControllerServiceEntity; +import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentEntity; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +public class ControllerServiceEntityMerger { + + /** + * Merges the ControllerServiceEntity responses. + * + * @param clientEntity the entity being returned to the client + * @param entityMap all node responses + */ + public static void mergeControllerServices(final ControllerServiceEntity clientEntity, final Map<NodeIdentifier, ControllerServiceEntity> entityMap) { + final ControllerServiceDTO clientDto = clientEntity.getComponent(); + final Map<NodeIdentifier, ControllerServiceDTO> dtoMap = new HashMap<>(); + for (final Map.Entry<NodeIdentifier, ControllerServiceEntity> entry : entityMap.entrySet()) { + final ControllerServiceEntity nodeControllerServiceEntity = entry.getValue(); + final ControllerServiceDTO nodeControllerServiceDto = nodeControllerServiceEntity.getComponent(); + dtoMap.put(entry.getKey(), nodeControllerServiceDto); + } + + ComponentEntityMerger.mergeComponents(clientEntity, entityMap); + + mergeDtos(clientDto, dtoMap); + } + + private static void mergeDtos(final ControllerServiceDTO clientDto, final Map<NodeIdentifier, ControllerServiceDTO> dtoMap) { + // if unauthorized for the client dto, simple return + if (clientDto == null) { + return; + } + + final Map<String, Set<NodeIdentifier>> validationErrorMap = new HashMap<>(); + final Set<ControllerServiceReferencingComponentEntity> referencingComponents = clientDto.getReferencingComponents(); + final Map<NodeIdentifier, Set<ControllerServiceReferencingComponentEntity>> nodeReferencingComponentsMap = new HashMap<>(); + + String state = null; + for (final Map.Entry<NodeIdentifier, ControllerServiceDTO> nodeEntry : dtoMap.entrySet()) { + final ControllerServiceDTO nodeControllerService = nodeEntry.getValue(); + + // consider the node controller service if authorized + if (nodeControllerService != null) { + final NodeIdentifier nodeId = nodeEntry.getKey(); + + if (state == null) { + if (ControllerServiceState.DISABLING.name().equals(nodeControllerService.getState())) { + state = ControllerServiceState.DISABLING.name(); + } else if (ControllerServiceState.ENABLING.name().equals(nodeControllerService.getState())) { + state = ControllerServiceState.ENABLING.name(); + } + } + + nodeReferencingComponentsMap.put(nodeId, nodeControllerService.getReferencingComponents()); + + // merge the validation errors + ErrorMerger.mergeErrors(validationErrorMap, nodeId, nodeControllerService.getValidationErrors()); + } + } + + // merge the referencing components + mergeControllerServiceReferences(referencingComponents, nodeReferencingComponentsMap); + + // store the 'transition' state is applicable + if (state != null) { + clientDto.setState(state); + } + + // set the merged the validation errors + clientDto.setValidationErrors(ErrorMerger.normalizedMergedErrors(validationErrorMap, dtoMap.size())); + } + + public static void mergeControllerServiceReferences(Set<ControllerServiceReferencingComponentEntity> referencingComponents, + Map<NodeIdentifier, Set<ControllerServiceReferencingComponentEntity>> referencingComponentMap) { + + final Map<String, Integer> activeThreadCounts = new HashMap<>(); + final Map<String, String> states = new HashMap<>(); + for (final Map.Entry<NodeIdentifier, Set<ControllerServiceReferencingComponentEntity>> nodeEntry : referencingComponentMap.entrySet()) { + final Set<ControllerServiceReferencingComponentEntity> nodeReferencingComponents = nodeEntry.getValue(); + + // go through all the nodes referencing components + if (nodeReferencingComponents != null) { + for (final ControllerServiceReferencingComponentEntity nodeReferencingComponentEntity : nodeReferencingComponents) { + final ControllerServiceReferencingComponentDTO nodeReferencingComponent = nodeReferencingComponentEntity.getComponent(); + + // handle active thread counts + if (nodeReferencingComponent.getActiveThreadCount() != null && nodeReferencingComponent.getActiveThreadCount() > 0) { + final Integer current = activeThreadCounts.get(nodeReferencingComponent.getId()); + if (current == null) { + activeThreadCounts.put(nodeReferencingComponent.getId(), nodeReferencingComponent.getActiveThreadCount()); + } else { + activeThreadCounts.put(nodeReferencingComponent.getId(), nodeReferencingComponent.getActiveThreadCount() + current); + } + } + + // handle controller service state + final String state = states.get(nodeReferencingComponent.getId()); + if (state == null) { + if (ControllerServiceState.DISABLING.name().equals(nodeReferencingComponent.getState())) { + states.put(nodeReferencingComponent.getId(), ControllerServiceState.DISABLING.name()); + } else if (ControllerServiceState.ENABLING.name().equals(nodeReferencingComponent.getState())) { + states.put(nodeReferencingComponent.getId(), ControllerServiceState.ENABLING.name()); + } + } + } + } + } + + // go through each referencing components + for (final ControllerServiceReferencingComponentEntity referencingComponent : referencingComponents) { + final Integer activeThreadCount = activeThreadCounts.get(referencingComponent.getId()); + if (activeThreadCount != null) { + referencingComponent.getComponent().setActiveThreadCount(activeThreadCount); + } + + final String state = states.get(referencingComponent.getId()); + if (state != null) { + referencingComponent.getComponent().setState(state); + } + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0b437e09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ControllerServicesEntityMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ControllerServicesEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ControllerServicesEntityMerger.java new file mode 100644 index 0000000..ca97af6 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ControllerServicesEntityMerger.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager; + +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.entity.ControllerServiceEntity; + +import java.util.Map; +import java.util.Set; + +public class ControllerServicesEntityMerger { + + /** + * Merges multiple ControllerServiceEntity responses. + * + * @param controllerServiceEntities entities being returned to the client + * @param entityMap all node responses + */ + public static void mergeControllerServices(final Set<ControllerServiceEntity> controllerServiceEntities, final Map<String, Map<NodeIdentifier, ControllerServiceEntity>> entityMap) { + for (final ControllerServiceEntity entity : controllerServiceEntities) { + ControllerServiceEntityMerger.mergeControllerServices(entity, entityMap.get(entity.getId())); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0b437e09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ErrorMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ErrorMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ErrorMerger.java new file mode 100644 index 0000000..1b51e16 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ErrorMerger.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager; + +import org.apache.nifi.cluster.protocol.NodeIdentifier; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +public final class ErrorMerger { + + private ErrorMerger() {} + + /** + * Merges the validation or authorization errors. + * + * @param validationErrorMap errors for each node + * @param nodeId node id + * @param nodeErrors node errors + */ + public static void mergeErrors(final Map<String, Set<NodeIdentifier>> validationErrorMap, final NodeIdentifier nodeId, final Collection<String> nodeErrors) { + if (nodeErrors != null) { + nodeErrors.stream().forEach( + err -> validationErrorMap.computeIfAbsent(err, k -> new HashSet<NodeIdentifier>()) + .add(nodeId)); + } + } + + /** + * Normalizes the validation errors. + * + * @param errorMap validation errors for each node + * @param totalNodes total number of nodes + * @return the normalized validation errors + */ + public static Set<String> normalizedMergedErrors(final Map<String, Set<NodeIdentifier>> errorMap, int totalNodes) { + final Set<String> normalizedErrors = new HashSet<>(); + for (final Map.Entry<String, Set<NodeIdentifier>> validationEntry : errorMap.entrySet()) { + final String msg = validationEntry.getKey(); + final Set<NodeIdentifier> nodeIds = validationEntry.getValue(); + + if (nodeIds.size() == totalNodes) { + normalizedErrors.add(msg); + } else { + nodeIds.forEach(id -> normalizedErrors.add(id.getApiAddress() + ":" + id.getApiPort() + " -- " + msg)); + } + } + return normalizedErrors; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0b437e09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PortEntityMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PortEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PortEntityMerger.java new file mode 100644 index 0000000..37c922f --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PortEntityMerger.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager; + +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.dto.PortDTO; +import org.apache.nifi.web.api.entity.PortEntity; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +public class PortEntityMerger { + + /** + * Merges the PortEntity responses. + * + * @param clientEntity the entity being returned to the client + * @param entityMap all node responses + */ + public static void mergePorts(final PortEntity clientEntity, final Map<NodeIdentifier, PortEntity> entityMap) { + final PortDTO clientDto = clientEntity.getComponent(); + final Map<NodeIdentifier, PortDTO> dtoMap = new HashMap<>(); + for (final Map.Entry<NodeIdentifier, PortEntity> entry : entityMap.entrySet()) { + final PortEntity nodePortEntity = entry.getValue(); + final PortDTO nodePortDto = nodePortEntity.getComponent(); + dtoMap.put(entry.getKey(), nodePortDto); + } + + for (final Map.Entry<NodeIdentifier, PortEntity> entry : entityMap.entrySet()) { + final NodeIdentifier nodeId = entry.getKey(); + final PortEntity entity = entry.getValue(); + if (entity != clientEntity) { + StatusMerger.merge(clientEntity.getStatus(), entity.getStatus(), nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort()); + } + } + + ComponentEntityMerger.mergeComponents(clientEntity, entityMap); + + mergeDtos(clientDto, dtoMap); + } + + private static void mergeDtos(final PortDTO clientDto, final Map<NodeIdentifier, PortDTO> dtoMap) { + // if unauthorized for the client dto, simple return + if (clientDto == null) { + return; + } + + final Map<String, Set<NodeIdentifier>> validationErrorMap = new HashMap<>(); + + for (final Map.Entry<NodeIdentifier, PortDTO> nodeEntry : dtoMap.entrySet()) { + final PortDTO nodePort = nodeEntry.getValue(); + + // merge the validation errors if authorized + if (nodePort != null) { + final NodeIdentifier nodeId = nodeEntry.getKey(); + ErrorMerger.mergeErrors(validationErrorMap, nodeId, nodePort.getValidationErrors()); + } + } + + // set the merged the validation errors + clientDto.setValidationErrors(ErrorMerger.normalizedMergedErrors(validationErrorMap, dtoMap.size())); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0b437e09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PortsEntityMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PortsEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PortsEntityMerger.java new file mode 100644 index 0000000..894ce5b --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PortsEntityMerger.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager; + +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.entity.PortEntity; + +import java.util.Map; +import java.util.Set; + +public class PortsEntityMerger { + + /** + * Merges multiple PortEntity responses. + * + * @param portEntities entities being returned to the client + * @param entityMap all node responses + */ + public static void mergePorts(final Set<PortEntity> portEntities, final Map<String, Map<NodeIdentifier, PortEntity>> entityMap) { + for (final PortEntity entity : portEntities) { + PortEntityMerger.mergePorts(entity, entityMap.get(entity.getId())); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0b437e09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessGroupEntityMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessGroupEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessGroupEntityMerger.java new file mode 100644 index 0000000..a9a1b40 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessGroupEntityMerger.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager; + +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.entity.ProcessGroupEntity; + +import java.util.Map; + +public class ProcessGroupEntityMerger { + + /** + * Merges the ProcessorGroupEntity responses. + * + * @param clientEntity the entity being returned to the client + * @param entityMap all node responses + */ + public static void mergeProcessGroups(final ProcessGroupEntity clientEntity, final Map<NodeIdentifier, ProcessGroupEntity> entityMap) { + for (final Map.Entry<NodeIdentifier, ProcessGroupEntity> entry : entityMap.entrySet()) { + final NodeIdentifier nodeId = entry.getKey(); + final ProcessGroupEntity entity = entry.getValue(); + if (entity != clientEntity) { + StatusMerger.merge(clientEntity.getStatus(), entity.getStatus(), nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort()); + } + } + + ComponentEntityMerger.mergeComponents(clientEntity, entityMap); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0b437e09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessGroupsEntityMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessGroupsEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessGroupsEntityMerger.java new file mode 100644 index 0000000..6a0e519 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessGroupsEntityMerger.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager; + +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.entity.ProcessGroupEntity; + +import java.util.Map; +import java.util.Set; + +public class ProcessGroupsEntityMerger { + + /** + * Merges multiple ProcessGroupEntity responses. + * + * @param processGroupEntities entities being returned to the client + * @param entityMap all node responses + */ + public static void mergeProcessGroups(final Set<ProcessGroupEntity> processGroupEntities, final Map<String, Map<NodeIdentifier, ProcessGroupEntity>> entityMap) { + for (final ProcessGroupEntity entity : processGroupEntities) { + ProcessGroupEntityMerger.mergeProcessGroups(entity, entityMap.get(entity.getId())); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0b437e09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorEntityMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorEntityMerger.java new file mode 100644 index 0000000..e730791 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorEntityMerger.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager; + +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.dto.ProcessorDTO; +import org.apache.nifi.web.api.entity.ProcessorEntity; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +public class ProcessorEntityMerger { + + /** + * Merges the ProcessorEntity responses. + * + * @param clientEntity the entity being returned to the client + * @param entityMap all node responses + */ + public static void mergeProcessors(final ProcessorEntity clientEntity, final Map<NodeIdentifier, ProcessorEntity> entityMap) { + final ProcessorDTO clientDto = clientEntity.getComponent(); + final Map<NodeIdentifier, ProcessorDTO> dtoMap = new HashMap<>(); + for (final Map.Entry<NodeIdentifier, ProcessorEntity> entry : entityMap.entrySet()) { + final ProcessorEntity nodeProcEntity = entry.getValue(); + final ProcessorDTO nodeProcDto = nodeProcEntity.getComponent(); + dtoMap.put(entry.getKey(), nodeProcDto); + } + + for (final Map.Entry<NodeIdentifier, ProcessorEntity> entry : entityMap.entrySet()) { + final NodeIdentifier nodeId = entry.getKey(); + final ProcessorEntity entity = entry.getValue(); + if (entity != clientEntity) { + StatusMerger.merge(clientEntity.getStatus(), entity.getStatus(), nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort()); + } + } + + ComponentEntityMerger.mergeComponents(clientEntity, entityMap); + + mergeDtos(clientDto, dtoMap); + } + + private static void mergeDtos(final ProcessorDTO clientDto, final Map<NodeIdentifier, ProcessorDTO> dtoMap) { + // if unauthorized for the client dto, simple return + if (clientDto == null) { + return; + } + + final Map<String, Set<NodeIdentifier>> validationErrorMap = new HashMap<>(); + + for (final Map.Entry<NodeIdentifier, ProcessorDTO> nodeEntry : dtoMap.entrySet()) { + final ProcessorDTO nodeProcessor = nodeEntry.getValue(); + + // merge the validation errors, if authorized + if (nodeProcessor != null) { + final NodeIdentifier nodeId = nodeEntry.getKey(); + ErrorMerger.mergeErrors(validationErrorMap, nodeId, nodeProcessor.getValidationErrors()); + } + } + + // set the merged the validation errors + clientDto.setValidationErrors(ErrorMerger.normalizedMergedErrors(validationErrorMap, dtoMap.size())); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0b437e09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorsEntityMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorsEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorsEntityMerger.java new file mode 100644 index 0000000..cf4ef7d --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorsEntityMerger.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager; + +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.entity.ProcessorEntity; + +import java.util.Map; +import java.util.Set; + +public class ProcessorsEntityMerger { + + /** + * Merges multiple ProcessorEntity responses. + * + * @param processorEntities entities being returned to the client + * @param entityMap all node responses + */ + public static void mergeProcessors(final Set<ProcessorEntity> processorEntities, final Map<String, Map<NodeIdentifier, ProcessorEntity>> entityMap) { + for (final ProcessorEntity entity : processorEntities) { + ProcessorEntityMerger.mergeProcessors(entity, entityMap.get(entity.getId())); + } + } +}