http://git-wip-us.apache.org/repos/asf/nifi/blob/0b437e09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessGroupEndpointMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessGroupEndpointMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessGroupEndpointMerger.java
index bef75a0..6b9b080 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessGroupEndpointMerger.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessGroupEndpointMerger.java
@@ -17,22 +17,18 @@
 
 package org.apache.nifi.cluster.coordination.http.endpoints;
 
-import java.net.URI;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.regex.Pattern;
-
 import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger;
 import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.manager.ProcessGroupEntityMerger;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
-import org.apache.nifi.web.api.dto.FlowSnippetDTO;
-import org.apache.nifi.web.api.dto.ProcessGroupDTO;
-import org.apache.nifi.web.api.dto.ProcessorDTO;
-import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
 import org.apache.nifi.web.api.entity.ProcessGroupEntity;
 
-public class ProcessGroupEndpointMerger implements EndpointResponseMerger {
+import java.net.URI;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+public class ProcessGroupEndpointMerger extends 
AbstractSingleEntityEndpoint<ProcessGroupEntity> implements 
EndpointResponseMerger {
     public static final Pattern PROCESS_GROUP_URI_PATTERN = 
Pattern.compile("/nifi-api/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))");
 
     @Override
@@ -41,67 +37,12 @@ public class ProcessGroupEndpointMerger implements 
EndpointResponseMerger {
     }
 
     @Override
-    public NodeResponse merge(final URI uri, final String method, final 
Set<NodeResponse> successfulResponses, final Set<NodeResponse> 
problematicResponses, final NodeResponse clientResponse) {
-        if (!canHandle(uri, method)) {
-            throw new IllegalArgumentException("Cannot use Endpoint Mapper of 
type " + getClass().getSimpleName() + " to map responses for URI " + uri + ", 
HTTP Method " + method);
-        }
-
-        final ProcessGroupEntity responseEntity = 
clientResponse.getClientResponse().getEntity(ProcessGroupEntity.class);
-        final ProcessGroupDTO responseDto = responseEntity.getComponent();
-
-        final FlowSnippetDTO contents = responseDto.getContents();
-        if (contents == null) {
-            return new NodeResponse(clientResponse, responseEntity);
-        } else {
-            final Map<String, Map<NodeIdentifier, ProcessorDTO>> processorMap 
= new HashMap<>();
-            final Map<String, Map<NodeIdentifier, RemoteProcessGroupDTO>> 
remoteProcessGroupMap = new HashMap<>();
-
-            for (final NodeResponse nodeResponse : successfulResponses) {
-                final ProcessGroupEntity nodeResponseEntity = nodeResponse == 
clientResponse ? responseEntity : 
nodeResponse.getClientResponse().getEntity(ProcessGroupEntity.class);
-                final ProcessGroupDTO nodeProcessGroup = 
nodeResponseEntity.getComponent();
-
-                for (final ProcessorDTO nodeProcessor : 
nodeProcessGroup.getContents().getProcessors()) {
-                    Map<NodeIdentifier, ProcessorDTO> innerMap = 
processorMap.get(nodeProcessor.getId());
-                    if (innerMap == null) {
-                        innerMap = new HashMap<>();
-                        processorMap.put(nodeProcessor.getId(), innerMap);
-                    }
-
-                    innerMap.put(nodeResponse.getNodeId(), nodeProcessor);
-                }
-
-                for (final RemoteProcessGroupDTO nodeRemoteProcessGroup : 
nodeProcessGroup.getContents().getRemoteProcessGroups()) {
-                    Map<NodeIdentifier, RemoteProcessGroupDTO> innerMap = 
remoteProcessGroupMap.get(nodeRemoteProcessGroup.getId());
-                    if (innerMap == null) {
-                        innerMap = new HashMap<>();
-                        
remoteProcessGroupMap.put(nodeRemoteProcessGroup.getId(), innerMap);
-                    }
-
-                    innerMap.put(nodeResponse.getNodeId(), 
nodeRemoteProcessGroup);
-                }
-            }
-
-            final ProcessorEndpointMerger procMerger = new 
ProcessorEndpointMerger();
-            for (final ProcessorDTO processor : contents.getProcessors()) {
-                final String procId = processor.getId();
-                final Map<NodeIdentifier, ProcessorDTO> mergeMap = 
processorMap.get(procId);
-
-                procMerger.mergeResponses(processor, mergeMap, 
successfulResponses, problematicResponses);
-            }
-
-            final RemoteProcessGroupEndpointMerger rpgMerger = new 
RemoteProcessGroupEndpointMerger();
-            for (final RemoteProcessGroupDTO remoteProcessGroup : 
contents.getRemoteProcessGroups()) {
-                if (remoteProcessGroup.getContents() != null) {
-                    final String remoteProcessGroupId = 
remoteProcessGroup.getId();
-                    final Map<NodeIdentifier, RemoteProcessGroupDTO> mergeMap 
= remoteProcessGroupMap.get(remoteProcessGroupId);
-
-                    rpgMerger.mergeResponses(remoteProcessGroup, mergeMap, 
successfulResponses, problematicResponses);
-                }
-            }
-        }
-
-        // create a new client response
-        return new NodeResponse(clientResponse, responseEntity);
+    protected Class<ProcessGroupEntity> getEntityClass() {
+        return ProcessGroupEntity.class;
     }
 
+    @Override
+    protected void mergeResponses(ProcessGroupEntity clientEntity, 
Map<NodeIdentifier, ProcessGroupEntity> entityMap, Set<NodeResponse> 
successfulResponses, Set<NodeResponse> problematicResponses) {
+        ProcessGroupEntityMerger.mergeProcessGroups(clientEntity, entityMap);
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b437e09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessGroupsEndpointMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessGroupsEndpointMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessGroupsEndpointMerger.java
new file mode 100644
index 0000000..d4f047e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessGroupsEndpointMerger.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.endpoints;
+
+import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger;
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.manager.ProcessGroupsEntityMerger;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.web.api.entity.ProcessGroupEntity;
+import org.apache.nifi.web.api.entity.ProcessGroupsEntity;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+public class ProcessGroupsEndpointMerger implements EndpointResponseMerger {
+    public static final Pattern PROCESS_GROUPS_URI_PATTERN = 
Pattern.compile("/nifi-api/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/process-groups");
+
+    @Override
+    public boolean canHandle(final URI uri, final String method) {
+        return "GET".equalsIgnoreCase(method) && 
PROCESS_GROUPS_URI_PATTERN.matcher(uri.getPath()).matches();
+    }
+
+    @Override
+    public final NodeResponse merge(final URI uri, final String method, final 
Set<NodeResponse> successfulResponses, final Set<NodeResponse> 
problematicResponses, final NodeResponse clientResponse) {
+        if (!canHandle(uri, method)) {
+            throw new IllegalArgumentException("Cannot use Endpoint Mapper of 
type " + getClass().getSimpleName() + " to map responses for URI " + uri + ", 
HTTP Method " + method);
+        }
+
+        final ProcessGroupsEntity responseEntity = 
clientResponse.getClientResponse().getEntity(ProcessGroupsEntity.class);
+        final Set<ProcessGroupEntity> processGroupEntities = 
responseEntity.getProcessGroups();
+
+        final Map<String, Map<NodeIdentifier, ProcessGroupEntity>> entityMap = 
new HashMap<>();
+        for (final NodeResponse nodeResponse : successfulResponses) {
+            final ProcessGroupsEntity nodeResponseEntity = nodeResponse == 
clientResponse ? responseEntity : 
nodeResponse.getClientResponse().getEntity(ProcessGroupsEntity.class);
+            final Set<ProcessGroupEntity> nodeProcessGroupEntities = 
nodeResponseEntity.getProcessGroups();
+
+            for (final ProcessGroupEntity nodeProcessGroupEntity : 
nodeProcessGroupEntities) {
+                final NodeIdentifier nodeId = nodeResponse.getNodeId();
+                Map<NodeIdentifier, ProcessGroupEntity> innerMap = 
entityMap.get(nodeId);
+                if (innerMap == null) {
+                    innerMap = new HashMap<>();
+                    entityMap.put(nodeProcessGroupEntity.getId(), innerMap);
+                }
+
+                innerMap.put(nodeResponse.getNodeId(), nodeProcessGroupEntity);
+            }
+        }
+
+        ProcessGroupsEntityMerger.mergeProcessGroups(processGroupEntities, 
entityMap);
+
+        // create a new client response
+        return new NodeResponse(clientResponse, responseEntity);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b437e09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorEndpointMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorEndpointMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorEndpointMerger.java
index bd50eca..b8202c8 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorEndpointMerger.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorEndpointMerger.java
@@ -17,28 +17,24 @@
 
 package org.apache.nifi.cluster.coordination.http.endpoints;
 
-import java.net.URI;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.regex.Pattern;
-
 import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger;
 import org.apache.nifi.cluster.manager.NodeResponse;
-import org.apache.nifi.cluster.manager.StatusMerger;
+import org.apache.nifi.cluster.manager.ProcessorEntityMerger;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
-import org.apache.nifi.web.api.dto.ProcessorDTO;
 import org.apache.nifi.web.api.entity.ProcessorEntity;
 
+import java.net.URI;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
 public class ProcessorEndpointMerger extends 
AbstractSingleEntityEndpoint<ProcessorEntity> implements EndpointResponseMerger 
{
-    public static final Pattern PROCESSORS_URI_PATTERN = 
Pattern.compile("/nifi-api/processors");
+    public static final Pattern PROCESSORS_URI_PATTERN = 
Pattern.compile("/nifi-api/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors");
     public static final Pattern PROCESSOR_URI_PATTERN = 
Pattern.compile("/nifi-api/processors/[a-f0-9\\-]{36}");
-    public static final Pattern CLUSTER_PROCESSOR_URI_PATTERN = 
Pattern.compile("/nifi-api/cluster/processors/[a-f0-9\\-]{36}");
 
     @Override
     public boolean canHandle(final URI uri, final String method) {
-        if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method))
-            && (PROCESSOR_URI_PATTERN.matcher(uri.getPath()).matches() || 
CLUSTER_PROCESSOR_URI_PATTERN.matcher(uri.getPath()).matches())) {
+        if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) 
&& (PROCESSOR_URI_PATTERN.matcher(uri.getPath()).matches())) {
             return true;
         } else if ("POST".equalsIgnoreCase(method) && 
PROCESSORS_URI_PATTERN.matcher(uri.getPath()).matches()) {
             return true;
@@ -53,42 +49,10 @@ public class ProcessorEndpointMerger extends 
AbstractSingleEntityEndpoint<Proces
     }
 
 
-    protected void mergeResponses(final ProcessorDTO clientDto, final 
Map<NodeIdentifier, ProcessorDTO> dtoMap, final Set<NodeResponse> 
successfulResponses,
-        final Set<NodeResponse> problematicResponses) {
-        final Map<String, Set<NodeIdentifier>> validationErrorMap = new 
HashMap<>();
-
-        for (final Map.Entry<NodeIdentifier, ProcessorDTO> nodeEntry : 
dtoMap.entrySet()) {
-            final NodeIdentifier nodeId = nodeEntry.getKey();
-            final ProcessorDTO nodeProcessor = nodeEntry.getValue();
-
-            // merge the validation errors
-            mergeValidationErrors(validationErrorMap, nodeId, 
nodeProcessor.getValidationErrors());
-        }
-
-        // set the merged the validation errors
-        
clientDto.setValidationErrors(normalizedMergedValidationErrors(validationErrorMap,
 dtoMap.size()));
-    }
-
     @Override
     protected void mergeResponses(final ProcessorEntity clientEntity, final 
Map<NodeIdentifier, ProcessorEntity> entityMap, final Set<NodeResponse> 
successfulResponses,
         final Set<NodeResponse> problematicResponses) {
 
-        final ProcessorDTO clientDto = clientEntity.getComponent();
-        final Map<NodeIdentifier, ProcessorDTO> dtoMap = new HashMap<>();
-        for (final Map.Entry<NodeIdentifier, ProcessorEntity> entry : 
entityMap.entrySet()) {
-            final ProcessorEntity nodeProcEntity = entry.getValue();
-            final ProcessorDTO nodeProcDto = nodeProcEntity.getComponent();
-            dtoMap.put(entry.getKey(), nodeProcDto);
-        }
-
-        for (final Map.Entry<NodeIdentifier, ProcessorEntity> entry : 
entityMap.entrySet()) {
-            final NodeIdentifier nodeId = entry.getKey();
-            final ProcessorEntity entity = entry.getValue();
-            if (entity != clientEntity) {
-                StatusMerger.merge(clientEntity.getStatus(), 
entity.getStatus(), nodeId.getId(), nodeId.getApiAddress(), 
nodeId.getApiPort());
-            }
-        }
-
-        mergeResponses(clientDto, dtoMap, successfulResponses, 
problematicResponses);
+        ProcessorEntityMerger.mergeProcessors(clientEntity, entityMap);
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b437e09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorsEndpointMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorsEndpointMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorsEndpointMerger.java
index fa076b9..da84c58 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorsEndpointMerger.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorsEndpointMerger.java
@@ -17,21 +17,21 @@
 
 package org.apache.nifi.cluster.coordination.http.endpoints;
 
-import java.net.URI;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.regex.Pattern;
-
 import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger;
 import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.manager.ProcessorsEntityMerger;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
-import org.apache.nifi.web.api.dto.ProcessorDTO;
 import org.apache.nifi.web.api.entity.ProcessorEntity;
 import org.apache.nifi.web.api.entity.ProcessorsEntity;
 
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
 public class ProcessorsEndpointMerger implements EndpointResponseMerger {
-    public static final Pattern PROCESSORS_URI_PATTERN = 
Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors");
+    public static final Pattern PROCESSORS_URI_PATTERN = 
Pattern.compile("/nifi-api/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors");
 
     @Override
     public boolean canHandle(final URI uri, final String method) {
@@ -47,30 +47,24 @@ public class ProcessorsEndpointMerger implements 
EndpointResponseMerger {
         final ProcessorsEntity responseEntity = 
clientResponse.getClientResponse().getEntity(ProcessorsEntity.class);
         final Set<ProcessorEntity> processorEntities = 
responseEntity.getProcessors();
 
-        final Map<String, Map<NodeIdentifier, ProcessorDTO>> dtoMap = new 
HashMap<>();
+        final Map<String, Map<NodeIdentifier, ProcessorEntity>> entityMap = 
new HashMap<>();
         for (final NodeResponse nodeResponse : successfulResponses) {
             final ProcessorsEntity nodeResponseEntity = nodeResponse == 
clientResponse ? responseEntity : 
nodeResponse.getClientResponse().getEntity(ProcessorsEntity.class);
             final Set<ProcessorEntity> nodeProcessorEntities = 
nodeResponseEntity.getProcessors();
 
             for (final ProcessorEntity nodeProcessorEntity : 
nodeProcessorEntities) {
                 final NodeIdentifier nodeId = nodeResponse.getNodeId();
-                Map<NodeIdentifier, ProcessorDTO> innerMap = 
dtoMap.get(nodeId);
+                Map<NodeIdentifier, ProcessorEntity> innerMap = 
entityMap.get(nodeId);
                 if (innerMap == null) {
                     innerMap = new HashMap<>();
-                    dtoMap.put(nodeProcessorEntity.getId(), innerMap);
+                    entityMap.put(nodeProcessorEntity.getId(), innerMap);
                 }
 
-                innerMap.put(nodeResponse.getNodeId(), 
nodeProcessorEntity.getComponent());
+                innerMap.put(nodeResponse.getNodeId(), nodeProcessorEntity);
             }
         }
 
-        final ProcessorEndpointMerger procMerger = new 
ProcessorEndpointMerger();
-        for (final ProcessorEntity entity : processorEntities) {
-            final String componentId = entity.getId();
-            final Map<NodeIdentifier, ProcessorDTO> mergeMap = 
dtoMap.get(componentId);
-
-            procMerger.mergeResponses(entity.getComponent(), mergeMap, 
successfulResponses, problematicResponses);
-        }
+        ProcessorsEntityMerger.mergeProcessors(processorEntities, entityMap);
 
         // create a new client response
         return new NodeResponse(clientResponse, responseEntity);

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b437e09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupEndpointMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupEndpointMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupEndpointMerger.java
index 732d527..e130e5e 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupEndpointMerger.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupEndpointMerger.java
@@ -17,24 +17,19 @@
 
 package org.apache.nifi.cluster.coordination.http.endpoints;
 
+import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger;
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.manager.RemoteProcessGroupEntityMerger;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
+
 import java.net.URI;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.regex.Pattern;
 
-import org.apache.nifi.cluster.manager.NodeResponse;
-import org.apache.nifi.cluster.protocol.NodeIdentifier;
-import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO;
-import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
-import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
-import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
-
-public class RemoteProcessGroupEndpointMerger extends 
AbstractSingleDTOEndpoint<RemoteProcessGroupEntity, RemoteProcessGroupDTO> {
-    public static final Pattern REMOTE_PROCESS_GROUPS_URI_PATTERN = 
Pattern.compile("/nifi-api/remote-process-groups");
+public class RemoteProcessGroupEndpointMerger extends 
AbstractSingleEntityEndpoint<RemoteProcessGroupEntity> implements 
EndpointResponseMerger {
+    public static final Pattern REMOTE_PROCESS_GROUPS_URI_PATTERN = 
Pattern.compile("/nifi-api/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/remote-process-groups");
     public static final Pattern REMOTE_PROCESS_GROUP_URI_PATTERN = 
Pattern.compile("/nifi-api/remote-process-groups/[a-f0-9\\-]{36}");
 
     @Override
@@ -54,76 +49,9 @@ public class RemoteProcessGroupEndpointMerger extends 
AbstractSingleDTOEndpoint<
     }
 
     @Override
-    protected RemoteProcessGroupDTO getDto(final RemoteProcessGroupEntity 
entity) {
-        return entity.getComponent();
-    }
-
-    @Override
-    protected void mergeResponses(RemoteProcessGroupDTO clientDto, 
Map<NodeIdentifier, RemoteProcessGroupDTO> dtoMap, Set<NodeResponse> 
successfulResponses, Set<NodeResponse> problematicResponses) {
-        final RemoteProcessGroupContentsDTO remoteProcessGroupContents = 
clientDto.getContents();
-
-        Boolean mergedIsTargetSecure = null;
-        final List<String> mergedAuthorizationIssues = new ArrayList<>();
-        final Set<RemoteProcessGroupPortDTO> mergedInputPorts = new 
HashSet<>();
-        final Set<RemoteProcessGroupPortDTO> mergedOutputPorts = new 
HashSet<>();
-
-        for (final Map.Entry<NodeIdentifier, RemoteProcessGroupDTO> nodeEntry 
: dtoMap.entrySet()) {
-            final NodeIdentifier nodeId = nodeEntry.getKey();
-            final RemoteProcessGroupDTO nodeRemoteProcessGroupDto = 
nodeEntry.getValue();
-
-            // merge the issues
-            final List<String> nodeAuthorizationIssues = 
nodeRemoteProcessGroupDto.getAuthorizationIssues();
-            if (nodeAuthorizationIssues != null && 
!nodeAuthorizationIssues.isEmpty()) {
-                for (final String nodeAuthorizationIssue : 
nodeAuthorizationIssues) {
-                    mergedAuthorizationIssues.add(nodeId.getApiAddress() + ":" 
+ nodeId.getApiPort() + " -- " + nodeAuthorizationIssue);
-                }
-            }
-
-            // use the first target secure flag since they will all be the same
-            final Boolean nodeIsTargetSecure = 
nodeRemoteProcessGroupDto.isTargetSecure();
-            if (mergedIsTargetSecure == null) {
-                mergedIsTargetSecure = nodeIsTargetSecure;
-            }
-
-            // merge the ports in the contents
-            final RemoteProcessGroupContentsDTO 
nodeRemoteProcessGroupContentsDto = nodeRemoteProcessGroupDto.getContents();
-            if (remoteProcessGroupContents != null && 
nodeRemoteProcessGroupContentsDto != null) {
-                if (nodeRemoteProcessGroupContentsDto.getInputPorts() != null) 
{
-                    
mergedInputPorts.addAll(nodeRemoteProcessGroupContentsDto.getInputPorts());
-                }
-                if (nodeRemoteProcessGroupContentsDto.getOutputPorts() != 
null) {
-                    
mergedOutputPorts.addAll(nodeRemoteProcessGroupContentsDto.getOutputPorts());
-                }
-            }
-        }
-
-        if (remoteProcessGroupContents != null) {
-            if (!mergedInputPorts.isEmpty()) {
-                remoteProcessGroupContents.setInputPorts(mergedInputPorts);
-            }
-            if (!mergedOutputPorts.isEmpty()) {
-                remoteProcessGroupContents.setOutputPorts(mergedOutputPorts);
-            }
-        }
-
-        if (mergedIsTargetSecure != null) {
-            clientDto.setTargetSecure(mergedIsTargetSecure);
-        }
-
-        if (!mergedAuthorizationIssues.isEmpty()) {
-            clientDto.setAuthorizationIssues(mergedAuthorizationIssues);
-        }
-    }
-
     protected void mergeResponses(RemoteProcessGroupEntity clientEntity, 
Map<NodeIdentifier, RemoteProcessGroupEntity> entityMap,
-        Set<NodeResponse> successfulResponses, Set<NodeResponse> 
problematicResponses) {
-
-        final RemoteProcessGroupDTO clientDto = clientEntity.getComponent();
-        final Map<NodeIdentifier, RemoteProcessGroupDTO> dtoMap = new 
HashMap<>();
-        for (final Map.Entry<NodeIdentifier, RemoteProcessGroupEntity> entry : 
entityMap.entrySet()) {
-            dtoMap.put(entry.getKey(), entry.getValue().getComponent());
-        }
+                                  Set<NodeResponse> successfulResponses, 
Set<NodeResponse> problematicResponses) {
 
-        mergeResponses(clientDto, dtoMap, successfulResponses, 
problematicResponses);
+        RemoteProcessGroupEntityMerger.mergeRemoteProcessGroups(clientEntity, 
entityMap);
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b437e09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupsEndpointMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupsEndpointMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupsEndpointMerger.java
index c387951..c95aa9b 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupsEndpointMerger.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupsEndpointMerger.java
@@ -17,21 +17,21 @@
 
 package org.apache.nifi.cluster.coordination.http.endpoints;
 
-import java.net.URI;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.regex.Pattern;
-
 import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger;
 import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.manager.RemoteProcessGroupsEntityMerger;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
-import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
 import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
 import org.apache.nifi.web.api.entity.RemoteProcessGroupsEntity;
 
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
 public class RemoteProcessGroupsEndpointMerger implements 
EndpointResponseMerger {
-    public static final Pattern REMOTE_PROCESS_GROUPS_URI_PATTERN = 
Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/remote-process-groups");
+    public static final Pattern REMOTE_PROCESS_GROUPS_URI_PATTERN = 
Pattern.compile("/nifi-api/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/remote-process-groups");
 
     @Override
     public boolean canHandle(final URI uri, final String method) {
@@ -47,30 +47,24 @@ public class RemoteProcessGroupsEndpointMerger implements 
EndpointResponseMerger
         final RemoteProcessGroupsEntity responseEntity = 
clientResponse.getClientResponse().getEntity(RemoteProcessGroupsEntity.class);
         final Set<RemoteProcessGroupEntity> rpgEntities = 
responseEntity.getRemoteProcessGroups();
 
-        final Map<String, Map<NodeIdentifier, RemoteProcessGroupDTO>> dtoMap = 
new HashMap<>();
+        final Map<String, Map<NodeIdentifier, RemoteProcessGroupEntity>> 
entityMap = new HashMap<>();
         for (final NodeResponse nodeResponse : successfulResponses) {
             final RemoteProcessGroupsEntity nodeResponseEntity = nodeResponse 
== clientResponse ? responseEntity : 
nodeResponse.getClientResponse().getEntity(RemoteProcessGroupsEntity.class);
             final Set<RemoteProcessGroupEntity> nodeRpgEntities = 
nodeResponseEntity.getRemoteProcessGroups();
 
             for (final RemoteProcessGroupEntity nodeRpgEntity : 
nodeRpgEntities) {
                 final NodeIdentifier nodeId = nodeResponse.getNodeId();
-                Map<NodeIdentifier, RemoteProcessGroupDTO> innerMap = 
dtoMap.get(nodeId);
+                Map<NodeIdentifier, RemoteProcessGroupEntity> innerMap = 
entityMap.get(nodeId);
                 if (innerMap == null) {
                     innerMap = new HashMap<>();
-                    dtoMap.put(nodeRpgEntity.getId(), innerMap);
+                    entityMap.put(nodeRpgEntity.getId(), innerMap);
                 }
 
-                innerMap.put(nodeResponse.getNodeId(), 
nodeRpgEntity.getComponent());
+                innerMap.put(nodeResponse.getNodeId(), nodeRpgEntity);
             }
         }
 
-        final RemoteProcessGroupEndpointMerger rpgMerger = new 
RemoteProcessGroupEndpointMerger();
-        for (final RemoteProcessGroupEntity entity : rpgEntities) {
-            final String componentId = entity.getId();
-            final Map<NodeIdentifier, RemoteProcessGroupDTO> mergeMap = 
dtoMap.get(componentId);
-
-            rpgMerger.mergeResponses(entity.getComponent(), mergeMap, 
successfulResponses, problematicResponses);
-        }
+        RemoteProcessGroupsEntityMerger.mergeRemoteProcessGroups(rpgEntities, 
entityMap);
 
         // create a new client response
         return new NodeResponse(clientResponse, responseEntity);

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b437e09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ReportingTaskEndpointMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ReportingTaskEndpointMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ReportingTaskEndpointMerger.java
index 245d3bd..b3a7ccc 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ReportingTaskEndpointMerger.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ReportingTaskEndpointMerger.java
@@ -17,20 +17,20 @@
 
 package org.apache.nifi.cluster.coordination.http.endpoints;
 
+import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger;
 import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.manager.ReportingTaskEntityMerger;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
-import org.apache.nifi.web.api.dto.ReportingTaskDTO;
 import org.apache.nifi.web.api.entity.ReportingTaskEntity;
 
 import java.net.URI;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.regex.Pattern;
 
-public class ReportingTaskEndpointMerger extends 
AbstractSingleDTOEndpoint<ReportingTaskEntity, ReportingTaskDTO> {
-    public static final String REPORTING_TASKS_URI = 
"/nifi-api/controller/reporting-tasks/node";
-    public static final Pattern REPORTING_TASK_URI_PATTERN = 
Pattern.compile("/nifi-api/reporting-tasks/node/[a-f0-9\\-]{36}");
+public class ReportingTaskEndpointMerger  extends 
AbstractSingleEntityEndpoint<ReportingTaskEntity> implements 
EndpointResponseMerger {
+    public static final String REPORTING_TASKS_URI = 
"/nifi-api/controller/reporting-tasks";
+    public static final Pattern REPORTING_TASK_URI_PATTERN = 
Pattern.compile("/nifi-api/reporting-tasks/[a-f0-9\\-]{36}");
 
     @Override
     public boolean canHandle(URI uri, String method) {
@@ -49,32 +49,7 @@ public class ReportingTaskEndpointMerger extends 
AbstractSingleDTOEndpoint<Repor
     }
 
     @Override
-    protected ReportingTaskDTO getDto(ReportingTaskEntity entity) {
-        return entity.getComponent();
+    protected void mergeResponses(ReportingTaskEntity clientEntity, 
Map<NodeIdentifier, ReportingTaskEntity> entityMap, Set<NodeResponse> 
successfulResponses, Set<NodeResponse> problematicResponses) {
+        ReportingTaskEntityMerger.mergeReportingTasks(clientEntity, entityMap);
     }
-
-    @Override
-    protected void mergeResponses(ReportingTaskDTO clientDto, 
Map<NodeIdentifier, ReportingTaskDTO> dtoMap, Set<NodeResponse> 
successfulResponses, Set<NodeResponse> problematicResponses) {
-        final Map<String, Set<NodeIdentifier>> validationErrorMap = new 
HashMap<>();
-
-        int activeThreadCount = 0;
-        for (final Map.Entry<NodeIdentifier, ReportingTaskDTO> nodeEntry : 
dtoMap.entrySet()) {
-            final NodeIdentifier nodeId = nodeEntry.getKey();
-            final ReportingTaskDTO nodeReportingTask = nodeEntry.getValue();
-
-            if (nodeReportingTask.getActiveThreadCount() != null) {
-                activeThreadCount += nodeReportingTask.getActiveThreadCount();
-            }
-
-            // merge the validation errors
-            mergeValidationErrors(validationErrorMap, nodeId, 
nodeReportingTask.getValidationErrors());
-        }
-
-        // set the merged active thread counts
-        clientDto.setActiveThreadCount(activeThreadCount);
-
-        // set the merged the validation errors
-        
clientDto.setValidationErrors(normalizedMergedValidationErrors(validationErrorMap,
 dtoMap.size()));
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b437e09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ReportingTasksEndpointMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ReportingTasksEndpointMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ReportingTasksEndpointMerger.java
index b82b24a..a5390e8 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ReportingTasksEndpointMerger.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ReportingTasksEndpointMerger.java
@@ -17,18 +17,20 @@
 
 package org.apache.nifi.cluster.coordination.http.endpoints;
 
+import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger;
 import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.manager.ReportingTasksEntityMerger;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.web.api.entity.ReportingTaskEntity;
 import org.apache.nifi.web.api.entity.ReportingTasksEntity;
 
 import java.net.URI;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
-import java.util.stream.Collectors;
 
-public class ReportingTasksEndpointMerger extends 
AbstractMultiEntityEndpoint<ReportingTasksEntity, ReportingTaskEntity> {
-    public static final String REPORTING_TASKS_URI = 
"/nifi-api/controller/reporting-tasks/node";
+public class ReportingTasksEndpointMerger  implements EndpointResponseMerger {
+    public static final String REPORTING_TASKS_URI = 
"/nifi-api/controller/reporting-tasks";
 
     @Override
     public boolean canHandle(URI uri, String method) {
@@ -36,28 +38,34 @@ public class ReportingTasksEndpointMerger extends 
AbstractMultiEntityEndpoint<Re
     }
 
     @Override
-    protected Class<ReportingTasksEntity> getEntityClass() {
-        return ReportingTasksEntity.class;
-    }
+    public final NodeResponse merge(final URI uri, final String method, final 
Set<NodeResponse> successfulResponses, final Set<NodeResponse> 
problematicResponses, final NodeResponse clientResponse) {
+        if (!canHandle(uri, method)) {
+            throw new IllegalArgumentException("Cannot use Endpoint Mapper of 
type " + getClass().getSimpleName() + " to map responses for URI " + uri + ", 
HTTP Method " + method);
+        }
 
-    @Override
-    protected Set<ReportingTaskEntity> getDtos(ReportingTasksEntity entity) {
-        return entity.getReportingTasks();
-    }
+        final ReportingTasksEntity responseEntity = 
clientResponse.getClientResponse().getEntity(ReportingTasksEntity.class);
+        final Set<ReportingTaskEntity> reportingTasksEntities = 
responseEntity.getReportingTasks();
 
-    @Override
-    protected String getComponentId(ReportingTaskEntity entity) {
-        return entity.getComponent().getId();
-    }
+        final Map<String, Map<NodeIdentifier, ReportingTaskEntity>> entityMap 
= new HashMap<>();
+        for (final NodeResponse nodeResponse : successfulResponses) {
+            final ReportingTasksEntity nodeResponseEntity = nodeResponse == 
clientResponse ? responseEntity : 
nodeResponse.getClientResponse().getEntity(ReportingTasksEntity.class);
+            final Set<ReportingTaskEntity> nodeReportingTaskEntities = 
nodeResponseEntity.getReportingTasks();
 
-    @Override
-    protected void mergeResponses(ReportingTaskEntity entity, 
Map<NodeIdentifier, ReportingTaskEntity> entityMap,
-                                  Set<NodeResponse> successfulResponses, 
Set<NodeResponse> problematicResponses) {
-
-        new ReportingTaskEndpointMerger().mergeResponses(
-            entity.getComponent(),
-            
entityMap.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> 
e.getValue().getComponent())),
-            successfulResponses,
-            problematicResponses);
+            for (final ReportingTaskEntity nodeReportingTaskEntity : 
nodeReportingTaskEntities) {
+                final NodeIdentifier nodeId = nodeResponse.getNodeId();
+                Map<NodeIdentifier, ReportingTaskEntity> innerMap = 
entityMap.get(nodeId);
+                if (innerMap == null) {
+                    innerMap = new HashMap<>();
+                    entityMap.put(nodeReportingTaskEntity.getId(), innerMap);
+                }
+
+                innerMap.put(nodeResponse.getNodeId(), 
nodeReportingTaskEntity);
+            }
+        }
+
+        ReportingTasksEntityMerger.mergeReportingTasks(reportingTasksEntities, 
entityMap);
+
+        // create a new client response
+        return new NodeResponse(clientResponse, responseEntity);
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b437e09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/BulletinMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/BulletinMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/BulletinMerger.java
new file mode 100644
index 0000000..53bff3b
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/BulletinMerger.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager;
+
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.web.api.dto.BulletinDTO;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public final class BulletinMerger {
+
+    private BulletinMerger() {}
+
+    /**
+     * Merges the validation errors.
+     *
+     * @param bulletins bulletins
+     */
+    public static List<BulletinDTO> mergeBulletins(final Map<NodeIdentifier, 
List<BulletinDTO>> bulletins) {
+        final List<BulletinDTO> bulletinDtos = new ArrayList<>();
+
+        for (final Map.Entry<NodeIdentifier, List<BulletinDTO>> entry : 
bulletins.entrySet()) {
+            final NodeIdentifier nodeId = entry.getKey();
+            final List<BulletinDTO> nodeBulletins = entry.getValue();
+            final String nodeAddress = nodeId.getApiAddress() + ":" + 
nodeId.getApiPort();
+
+            for (final BulletinDTO bulletin : nodeBulletins) {
+                bulletin.setNodeAddress(nodeAddress);
+                bulletinDtos.add(bulletin);
+            }
+        }
+
+        Collections.sort(bulletinDtos, (BulletinDTO o1, BulletinDTO o2) -> {
+            final int timeComparison = 
o1.getTimestamp().compareTo(o2.getTimestamp());
+            if (timeComparison != 0) {
+                return timeComparison;
+            }
+
+            return o1.getNodeAddress().compareTo(o2.getNodeAddress());
+        });
+
+        return bulletinDtos;
+    }
+
+    /**
+     * Normalizes the validation errors.
+     *
+     * @param validationErrorMap validation errors for each node
+     * @param totalNodes total number of nodes
+     * @return the normalized validation errors
+     */
+    public static Set<String> normalizedMergedValidationErrors(final 
Map<String, Set<NodeIdentifier>> validationErrorMap, int totalNodes) {
+        final Set<String> normalizedValidationErrors = new HashSet<>();
+        for (final Map.Entry<String, Set<NodeIdentifier>> validationEntry : 
validationErrorMap.entrySet()) {
+            final String msg = validationEntry.getKey();
+            final Set<NodeIdentifier> nodeIds = validationEntry.getValue();
+
+            if (nodeIds.size() == totalNodes) {
+                normalizedValidationErrors.add(msg);
+            } else {
+                nodeIds.forEach(id -> 
normalizedValidationErrors.add(id.getApiAddress() + ":" + id.getApiPort() + " 
-- " + msg));
+            }
+        }
+        return normalizedValidationErrors;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b437e09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ComponentEntityMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ComponentEntityMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ComponentEntityMerger.java
new file mode 100644
index 0000000..566b1de
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ComponentEntityMerger.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager;
+
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.web.api.dto.BulletinDTO;
+import org.apache.nifi.web.api.entity.ComponentEntity;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ComponentEntityMerger {
+
+    /**
+     * Merges the ComponentEntity responses.
+     *
+     * @param clientEntity the entity being returned to the client
+     * @param entityMap all node responses
+     */
+    public static void mergeComponents(final ComponentEntity clientEntity, 
final Map<NodeIdentifier, ? extends ComponentEntity> entityMap) {
+        final Map<NodeIdentifier, List<BulletinDTO>> bulletinDtos = new 
HashMap<>();
+        for (final Map.Entry<NodeIdentifier, ? extends ComponentEntity> entry 
: entityMap.entrySet()) {
+            final NodeIdentifier nodeIdentifier = entry.getKey();
+            final ComponentEntity entity = entry.getValue();
+
+            // consider the bulletins if present and authorized
+            if (entity.getBulletins() != null) {
+                entity.getBulletins().forEach(bulletin -> {
+                    bulletinDtos.computeIfAbsent(nodeIdentifier, nodeId -> new 
ArrayList<>()).add(bulletin);
+                });
+            }
+        }
+        clientEntity.setBulletins(BulletinMerger.mergeBulletins(bulletinDtos));
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b437e09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectionEntityMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectionEntityMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectionEntityMerger.java
new file mode 100644
index 0000000..75dea4c
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectionEntityMerger.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager;
+
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.web.api.entity.ConnectionEntity;
+
+import java.util.Map;
+
+public class ConnectionEntityMerger {
+
+    /**
+     * Merges the ConnectionEntity responses.
+     *
+     * @param clientEntity the entity being returned to the client
+     * @param entityMap all node responses
+     */
+    public static void mergeConnections(final ConnectionEntity clientEntity, 
final Map<NodeIdentifier, ConnectionEntity> entityMap) {
+        for (final Map.Entry<NodeIdentifier, ConnectionEntity> entry : 
entityMap.entrySet()) {
+            final NodeIdentifier nodeId = entry.getKey();
+            final ConnectionEntity entity = entry.getValue();
+            if (entity != clientEntity) {
+                StatusMerger.merge(clientEntity.getStatus(), 
entity.getStatus(), nodeId.getId(), nodeId.getApiAddress(), 
nodeId.getApiPort());
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b437e09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectionsEntityMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectionsEntityMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectionsEntityMerger.java
new file mode 100644
index 0000000..42a9350
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectionsEntityMerger.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager;
+
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.web.api.entity.ConnectionEntity;
+
+import java.util.Map;
+import java.util.Set;
+
+public class ConnectionsEntityMerger {
+
+    /**
+     * Merges multiple ConnectionEntity responses.
+     *
+     * @param connectionEntities entities being returned to the client
+     * @param entityMap all node responses
+     */
+    public static void mergeConnections(final Set<ConnectionEntity> 
connectionEntities, final Map<String, Map<NodeIdentifier, ConnectionEntity>> 
entityMap) {
+        for (final ConnectionEntity entity : connectionEntities) {
+            ConnectionEntityMerger.mergeConnections(entity, 
entityMap.get(entity.getId()));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b437e09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ControllerServiceEntityMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ControllerServiceEntityMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ControllerServiceEntityMerger.java
new file mode 100644
index 0000000..e9e542e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ControllerServiceEntityMerger.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager;
+
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.controller.service.ControllerServiceState;
+import org.apache.nifi.web.api.dto.ControllerServiceDTO;
+import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO;
+import org.apache.nifi.web.api.entity.ControllerServiceEntity;
+import 
org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentEntity;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class ControllerServiceEntityMerger {
+
+    /**
+     * Merges the ControllerServiceEntity responses.
+     *
+     * @param clientEntity the entity being returned to the client
+     * @param entityMap all node responses
+     */
+    public static void mergeControllerServices(final ControllerServiceEntity 
clientEntity, final Map<NodeIdentifier, ControllerServiceEntity> entityMap) {
+        final ControllerServiceDTO clientDto = clientEntity.getComponent();
+        final Map<NodeIdentifier, ControllerServiceDTO> dtoMap = new 
HashMap<>();
+        for (final Map.Entry<NodeIdentifier, ControllerServiceEntity> entry : 
entityMap.entrySet()) {
+            final ControllerServiceEntity nodeControllerServiceEntity = 
entry.getValue();
+            final ControllerServiceDTO nodeControllerServiceDto = 
nodeControllerServiceEntity.getComponent();
+            dtoMap.put(entry.getKey(), nodeControllerServiceDto);
+        }
+
+        ComponentEntityMerger.mergeComponents(clientEntity, entityMap);
+
+        mergeDtos(clientDto, dtoMap);
+    }
+
+    private static void mergeDtos(final ControllerServiceDTO clientDto, final 
Map<NodeIdentifier, ControllerServiceDTO> dtoMap) {
+        // if unauthorized for the client dto, simple return
+        if (clientDto == null) {
+            return;
+        }
+
+        final Map<String, Set<NodeIdentifier>> validationErrorMap = new 
HashMap<>();
+        final Set<ControllerServiceReferencingComponentEntity> 
referencingComponents = clientDto.getReferencingComponents();
+        final Map<NodeIdentifier, 
Set<ControllerServiceReferencingComponentEntity>> nodeReferencingComponentsMap 
= new HashMap<>();
+
+        String state = null;
+        for (final Map.Entry<NodeIdentifier, ControllerServiceDTO> nodeEntry : 
dtoMap.entrySet()) {
+            final ControllerServiceDTO nodeControllerService = 
nodeEntry.getValue();
+
+            // consider the node controller service if authorized
+            if (nodeControllerService != null) {
+                final NodeIdentifier nodeId = nodeEntry.getKey();
+
+                if (state == null) {
+                    if 
(ControllerServiceState.DISABLING.name().equals(nodeControllerService.getState()))
 {
+                        state = ControllerServiceState.DISABLING.name();
+                    } else if 
(ControllerServiceState.ENABLING.name().equals(nodeControllerService.getState()))
 {
+                        state = ControllerServiceState.ENABLING.name();
+                    }
+                }
+
+                nodeReferencingComponentsMap.put(nodeId, 
nodeControllerService.getReferencingComponents());
+
+                // merge the validation errors
+                ErrorMerger.mergeErrors(validationErrorMap, nodeId, 
nodeControllerService.getValidationErrors());
+            }
+        }
+
+        // merge the referencing components
+        mergeControllerServiceReferences(referencingComponents, 
nodeReferencingComponentsMap);
+
+        // store the 'transition' state is applicable
+        if (state != null) {
+            clientDto.setState(state);
+        }
+
+        // set the merged the validation errors
+        
clientDto.setValidationErrors(ErrorMerger.normalizedMergedErrors(validationErrorMap,
 dtoMap.size()));
+    }
+
+    public static void 
mergeControllerServiceReferences(Set<ControllerServiceReferencingComponentEntity>
 referencingComponents,
+                                                        Map<NodeIdentifier, 
Set<ControllerServiceReferencingComponentEntity>> referencingComponentMap) {
+
+        final Map<String, Integer> activeThreadCounts = new HashMap<>();
+        final Map<String, String> states = new HashMap<>();
+        for (final Map.Entry<NodeIdentifier, 
Set<ControllerServiceReferencingComponentEntity>> nodeEntry : 
referencingComponentMap.entrySet()) {
+            final Set<ControllerServiceReferencingComponentEntity> 
nodeReferencingComponents = nodeEntry.getValue();
+
+            // go through all the nodes referencing components
+            if (nodeReferencingComponents != null) {
+                for (final ControllerServiceReferencingComponentEntity 
nodeReferencingComponentEntity : nodeReferencingComponents) {
+                    final ControllerServiceReferencingComponentDTO 
nodeReferencingComponent = nodeReferencingComponentEntity.getComponent();
+
+                    // handle active thread counts
+                    if (nodeReferencingComponent.getActiveThreadCount() != 
null && nodeReferencingComponent.getActiveThreadCount() > 0) {
+                        final Integer current = 
activeThreadCounts.get(nodeReferencingComponent.getId());
+                        if (current == null) {
+                            
activeThreadCounts.put(nodeReferencingComponent.getId(), 
nodeReferencingComponent.getActiveThreadCount());
+                        } else {
+                            
activeThreadCounts.put(nodeReferencingComponent.getId(), 
nodeReferencingComponent.getActiveThreadCount() + current);
+                        }
+                    }
+
+                    // handle controller service state
+                    final String state = 
states.get(nodeReferencingComponent.getId());
+                    if (state == null) {
+                        if 
(ControllerServiceState.DISABLING.name().equals(nodeReferencingComponent.getState()))
 {
+                            states.put(nodeReferencingComponent.getId(), 
ControllerServiceState.DISABLING.name());
+                        } else if 
(ControllerServiceState.ENABLING.name().equals(nodeReferencingComponent.getState()))
 {
+                            states.put(nodeReferencingComponent.getId(), 
ControllerServiceState.ENABLING.name());
+                        }
+                    }
+                }
+            }
+        }
+
+        // go through each referencing components
+        for (final ControllerServiceReferencingComponentEntity 
referencingComponent : referencingComponents) {
+            final Integer activeThreadCount = 
activeThreadCounts.get(referencingComponent.getId());
+            if (activeThreadCount != null) {
+                
referencingComponent.getComponent().setActiveThreadCount(activeThreadCount);
+            }
+
+            final String state = states.get(referencingComponent.getId());
+            if (state != null) {
+                referencingComponent.getComponent().setState(state);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b437e09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ControllerServicesEntityMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ControllerServicesEntityMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ControllerServicesEntityMerger.java
new file mode 100644
index 0000000..ca97af6
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ControllerServicesEntityMerger.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager;
+
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.web.api.entity.ControllerServiceEntity;
+
+import java.util.Map;
+import java.util.Set;
+
+public class ControllerServicesEntityMerger {
+
+    /**
+     * Merges multiple ControllerServiceEntity responses.
+     *
+     * @param controllerServiceEntities entities being returned to the client
+     * @param entityMap all node responses
+     */
+    public static void mergeControllerServices(final 
Set<ControllerServiceEntity> controllerServiceEntities, final Map<String, 
Map<NodeIdentifier, ControllerServiceEntity>> entityMap) {
+        for (final ControllerServiceEntity entity : controllerServiceEntities) 
{
+            ControllerServiceEntityMerger.mergeControllerServices(entity, 
entityMap.get(entity.getId()));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b437e09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ErrorMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ErrorMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ErrorMerger.java
new file mode 100644
index 0000000..1b51e16
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ErrorMerger.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager;
+
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public final class ErrorMerger {
+
+    private ErrorMerger() {}
+
+    /**
+     * Merges the validation or authorization errors.
+     *
+     * @param validationErrorMap errors for each node
+     * @param nodeId node id
+     * @param nodeErrors node errors
+     */
+    public static void mergeErrors(final Map<String, Set<NodeIdentifier>> 
validationErrorMap, final NodeIdentifier nodeId, final Collection<String> 
nodeErrors) {
+        if (nodeErrors != null) {
+            nodeErrors.stream().forEach(
+                    err -> validationErrorMap.computeIfAbsent(err, k -> new 
HashSet<NodeIdentifier>())
+                            .add(nodeId));
+        }
+    }
+
+    /**
+     * Normalizes the validation errors.
+     *
+     * @param errorMap validation errors for each node
+     * @param totalNodes total number of nodes
+     * @return the normalized validation errors
+     */
+    public static Set<String> normalizedMergedErrors(final Map<String, 
Set<NodeIdentifier>> errorMap, int totalNodes) {
+        final Set<String> normalizedErrors = new HashSet<>();
+        for (final Map.Entry<String, Set<NodeIdentifier>> validationEntry : 
errorMap.entrySet()) {
+            final String msg = validationEntry.getKey();
+            final Set<NodeIdentifier> nodeIds = validationEntry.getValue();
+
+            if (nodeIds.size() == totalNodes) {
+                normalizedErrors.add(msg);
+            } else {
+                nodeIds.forEach(id -> normalizedErrors.add(id.getApiAddress() 
+ ":" + id.getApiPort() + " -- " + msg));
+            }
+        }
+        return normalizedErrors;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b437e09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PortEntityMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PortEntityMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PortEntityMerger.java
new file mode 100644
index 0000000..37c922f
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PortEntityMerger.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager;
+
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.web.api.dto.PortDTO;
+import org.apache.nifi.web.api.entity.PortEntity;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class PortEntityMerger {
+
+    /**
+     * Merges the PortEntity responses.
+     *
+     * @param clientEntity the entity being returned to the client
+     * @param entityMap all node responses
+     */
+    public static void mergePorts(final PortEntity clientEntity, final 
Map<NodeIdentifier, PortEntity> entityMap) {
+        final PortDTO clientDto = clientEntity.getComponent();
+        final Map<NodeIdentifier, PortDTO> dtoMap = new HashMap<>();
+        for (final Map.Entry<NodeIdentifier, PortEntity> entry : 
entityMap.entrySet()) {
+            final PortEntity nodePortEntity = entry.getValue();
+            final PortDTO nodePortDto = nodePortEntity.getComponent();
+            dtoMap.put(entry.getKey(), nodePortDto);
+        }
+
+        for (final Map.Entry<NodeIdentifier, PortEntity> entry : 
entityMap.entrySet()) {
+            final NodeIdentifier nodeId = entry.getKey();
+            final PortEntity entity = entry.getValue();
+            if (entity != clientEntity) {
+                StatusMerger.merge(clientEntity.getStatus(), 
entity.getStatus(), nodeId.getId(), nodeId.getApiAddress(), 
nodeId.getApiPort());
+            }
+        }
+
+        ComponentEntityMerger.mergeComponents(clientEntity, entityMap);
+
+        mergeDtos(clientDto, dtoMap);
+    }
+
+    private static void mergeDtos(final PortDTO clientDto, final 
Map<NodeIdentifier, PortDTO> dtoMap) {
+        // if unauthorized for the client dto, simple return
+        if (clientDto == null) {
+            return;
+        }
+
+        final Map<String, Set<NodeIdentifier>> validationErrorMap = new 
HashMap<>();
+
+        for (final Map.Entry<NodeIdentifier, PortDTO> nodeEntry : 
dtoMap.entrySet()) {
+            final PortDTO nodePort = nodeEntry.getValue();
+
+            // merge the validation errors if authorized
+            if (nodePort != null) {
+                final NodeIdentifier nodeId = nodeEntry.getKey();
+                ErrorMerger.mergeErrors(validationErrorMap, nodeId, 
nodePort.getValidationErrors());
+            }
+        }
+
+        // set the merged the validation errors
+        
clientDto.setValidationErrors(ErrorMerger.normalizedMergedErrors(validationErrorMap,
 dtoMap.size()));
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b437e09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PortsEntityMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PortsEntityMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PortsEntityMerger.java
new file mode 100644
index 0000000..894ce5b
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PortsEntityMerger.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager;
+
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.web.api.entity.PortEntity;
+
+import java.util.Map;
+import java.util.Set;
+
+public class PortsEntityMerger {
+
+    /**
+     * Merges multiple PortEntity responses.
+     *
+     * @param portEntities entities being returned to the client
+     * @param entityMap all node responses
+     */
+    public static void mergePorts(final Set<PortEntity> portEntities, final 
Map<String, Map<NodeIdentifier, PortEntity>> entityMap) {
+        for (final PortEntity entity : portEntities) {
+            PortEntityMerger.mergePorts(entity, entityMap.get(entity.getId()));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b437e09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessGroupEntityMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessGroupEntityMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessGroupEntityMerger.java
new file mode 100644
index 0000000..a9a1b40
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessGroupEntityMerger.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager;
+
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.web.api.entity.ProcessGroupEntity;
+
+import java.util.Map;
+
+public class ProcessGroupEntityMerger {
+
+    /**
+     * Merges the ProcessorGroupEntity responses.
+     *
+     * @param clientEntity the entity being returned to the client
+     * @param entityMap all node responses
+     */
+    public static void mergeProcessGroups(final ProcessGroupEntity 
clientEntity, final Map<NodeIdentifier, ProcessGroupEntity> entityMap) {
+        for (final Map.Entry<NodeIdentifier, ProcessGroupEntity> entry : 
entityMap.entrySet()) {
+            final NodeIdentifier nodeId = entry.getKey();
+            final ProcessGroupEntity entity = entry.getValue();
+            if (entity != clientEntity) {
+                StatusMerger.merge(clientEntity.getStatus(), 
entity.getStatus(), nodeId.getId(), nodeId.getApiAddress(), 
nodeId.getApiPort());
+            }
+        }
+
+        ComponentEntityMerger.mergeComponents(clientEntity, entityMap);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b437e09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessGroupsEntityMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessGroupsEntityMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessGroupsEntityMerger.java
new file mode 100644
index 0000000..6a0e519
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessGroupsEntityMerger.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager;
+
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.web.api.entity.ProcessGroupEntity;
+
+import java.util.Map;
+import java.util.Set;
+
+public class ProcessGroupsEntityMerger {
+
+    /**
+     * Merges multiple ProcessGroupEntity responses.
+     *
+     * @param processGroupEntities entities being returned to the client
+     * @param entityMap all node responses
+     */
+    public static void mergeProcessGroups(final Set<ProcessGroupEntity> 
processGroupEntities, final Map<String, Map<NodeIdentifier, 
ProcessGroupEntity>> entityMap) {
+        for (final ProcessGroupEntity entity : processGroupEntities) {
+            ProcessGroupEntityMerger.mergeProcessGroups(entity, 
entityMap.get(entity.getId()));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b437e09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorEntityMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorEntityMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorEntityMerger.java
new file mode 100644
index 0000000..e730791
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorEntityMerger.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager;
+
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.web.api.dto.ProcessorDTO;
+import org.apache.nifi.web.api.entity.ProcessorEntity;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class ProcessorEntityMerger {
+
+    /**
+     * Merges the ProcessorEntity responses.
+     *
+     * @param clientEntity the entity being returned to the client
+     * @param entityMap all node responses
+     */
+    public static void mergeProcessors(final ProcessorEntity clientEntity, 
final Map<NodeIdentifier, ProcessorEntity> entityMap) {
+        final ProcessorDTO clientDto = clientEntity.getComponent();
+        final Map<NodeIdentifier, ProcessorDTO> dtoMap = new HashMap<>();
+        for (final Map.Entry<NodeIdentifier, ProcessorEntity> entry : 
entityMap.entrySet()) {
+            final ProcessorEntity nodeProcEntity = entry.getValue();
+            final ProcessorDTO nodeProcDto = nodeProcEntity.getComponent();
+            dtoMap.put(entry.getKey(), nodeProcDto);
+        }
+
+        for (final Map.Entry<NodeIdentifier, ProcessorEntity> entry : 
entityMap.entrySet()) {
+            final NodeIdentifier nodeId = entry.getKey();
+            final ProcessorEntity entity = entry.getValue();
+            if (entity != clientEntity) {
+                StatusMerger.merge(clientEntity.getStatus(), 
entity.getStatus(), nodeId.getId(), nodeId.getApiAddress(), 
nodeId.getApiPort());
+            }
+        }
+
+        ComponentEntityMerger.mergeComponents(clientEntity, entityMap);
+
+        mergeDtos(clientDto, dtoMap);
+    }
+
+    private static void mergeDtos(final ProcessorDTO clientDto, final 
Map<NodeIdentifier, ProcessorDTO> dtoMap) {
+        // if unauthorized for the client dto, simple return
+        if (clientDto == null) {
+            return;
+        }
+
+        final Map<String, Set<NodeIdentifier>> validationErrorMap = new 
HashMap<>();
+
+        for (final Map.Entry<NodeIdentifier, ProcessorDTO> nodeEntry : 
dtoMap.entrySet()) {
+            final ProcessorDTO nodeProcessor = nodeEntry.getValue();
+
+            // merge the validation errors, if authorized
+            if (nodeProcessor != null) {
+                final NodeIdentifier nodeId = nodeEntry.getKey();
+                ErrorMerger.mergeErrors(validationErrorMap, nodeId, 
nodeProcessor.getValidationErrors());
+            }
+        }
+
+        // set the merged the validation errors
+        
clientDto.setValidationErrors(ErrorMerger.normalizedMergedErrors(validationErrorMap,
 dtoMap.size()));
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b437e09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorsEntityMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorsEntityMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorsEntityMerger.java
new file mode 100644
index 0000000..cf4ef7d
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorsEntityMerger.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager;
+
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.web.api.entity.ProcessorEntity;
+
+import java.util.Map;
+import java.util.Set;
+
+public class ProcessorsEntityMerger {
+
+    /**
+     * Merges multiple ProcessorEntity responses.
+     *
+     * @param processorEntities entities being returned to the client
+     * @param entityMap all node responses
+     */
+    public static void mergeProcessors(final Set<ProcessorEntity> 
processorEntities, final Map<String, Map<NodeIdentifier, ProcessorEntity>> 
entityMap) {
+        for (final ProcessorEntity entity : processorEntities) {
+            ProcessorEntityMerger.mergeProcessors(entity, 
entityMap.get(entity.getId()));
+        }
+    }
+}

Reply via email to