http://git-wip-us.apache.org/repos/asf/nifi/blob/572dfed7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorStatusEndpointMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorStatusEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorStatusEndpointMerger.java index 8fdceb1..63758e2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorStatusEndpointMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorStatusEndpointMerger.java @@ -17,18 +17,21 @@ package org.apache.nifi.cluster.coordination.http.endpoints; -import java.net.URI; -import java.util.ArrayList; -import java.util.Map; -import java.util.regex.Pattern; - +import org.apache.nifi.cluster.manager.ComponentEntityStatusMerger; +import org.apache.nifi.cluster.manager.NodeResponse; import org.apache.nifi.cluster.manager.StatusMerger; import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.web.api.dto.status.NodeProcessorStatusSnapshotDTO; import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO; import org.apache.nifi.web.api.entity.ProcessorStatusEntity; -public class ProcessorStatusEndpointMerger extends AbstractNodeStatusEndpoint<ProcessorStatusEntity, ProcessorStatusDTO> { +import java.net.URI; +import java.util.ArrayList; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; + +public class ProcessorStatusEndpointMerger extends AbstractSingleEntityEndpoint<ProcessorStatusEntity> implements ComponentEntityStatusMerger<ProcessorStatusDTO> { public static final Pattern PROCESSOR_STATUS_URI_PATTERN = Pattern.compile("/nifi-api/flow/processors/[a-f0-9\\-]{36}/status"); @Override @@ -42,17 +45,19 @@ public class ProcessorStatusEndpointMerger extends AbstractNodeStatusEndpoint<Pr } @Override - protected ProcessorStatusDTO getDto(ProcessorStatusEntity entity) { - return entity.getProcessorStatus(); - } + protected void mergeResponses(ProcessorStatusEntity clientEntity, Map<NodeIdentifier, ProcessorStatusEntity> entityMap, Set<NodeResponse> successfulResponses, + Set<NodeResponse> problematicResponses) { + final ProcessorStatusDTO mergedProcessorStatus = clientEntity.getProcessorStatus(); + mergedProcessorStatus.setNodeSnapshots(new ArrayList<>()); - @Override - protected void mergeResponses(ProcessorStatusDTO clientDto, Map<NodeIdentifier, ProcessorStatusDTO> dtoMap, NodeIdentifier selectedNodeId) { - final ProcessorStatusDTO mergedProcessorStatus = clientDto; - mergedProcessorStatus.setNodeSnapshots(new ArrayList<NodeProcessorStatusSnapshotDTO>()); + final NodeIdentifier selectedNodeId = entityMap.entrySet().stream() + .filter(e -> e.getValue() == clientEntity) + .map(e -> e.getKey()) + .findFirst() + .orElse(null); final NodeProcessorStatusSnapshotDTO selectedNodeSnapshot = new NodeProcessorStatusSnapshotDTO(); - selectedNodeSnapshot.setStatusSnapshot(clientDto.getAggregateSnapshot().clone()); + selectedNodeSnapshot.setStatusSnapshot(mergedProcessorStatus.getAggregateSnapshot().clone()); selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress()); selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort()); selectedNodeSnapshot.setNodeId(selectedNodeId.getId()); @@ -60,15 +65,21 @@ public class ProcessorStatusEndpointMerger extends AbstractNodeStatusEndpoint<Pr mergedProcessorStatus.getNodeSnapshots().add(selectedNodeSnapshot); // merge the other nodes - for (final Map.Entry<NodeIdentifier, ProcessorStatusDTO> entry : dtoMap.entrySet()) { + for (final Map.Entry<NodeIdentifier, ProcessorStatusEntity> entry : entityMap.entrySet()) { final NodeIdentifier nodeId = entry.getKey(); - final ProcessorStatusDTO nodeProcessorStatus = entry.getValue(); - if (nodeProcessorStatus == clientDto) { + final ProcessorStatusEntity nodeProcessorStatusEntity = entry.getValue(); + final ProcessorStatusDTO nodeProcessorStatus = nodeProcessorStatusEntity.getProcessorStatus(); + if (nodeProcessorStatus == mergedProcessorStatus) { continue; } - StatusMerger.merge(mergedProcessorStatus, nodeProcessorStatus, nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort()); + mergeStatus(mergedProcessorStatus, clientEntity.getCanRead(), nodeProcessorStatus, nodeProcessorStatusEntity.getCanRead(), nodeId); } } + @Override + public void mergeStatus(ProcessorStatusDTO clientStatus, boolean clientStatusReadablePermission, ProcessorStatusDTO status, boolean statusReadablePermission, NodeIdentifier statusNodeIdentifier) { + StatusMerger.merge(clientStatus, clientStatusReadablePermission, status, statusReadablePermission, statusNodeIdentifier.getId(), statusNodeIdentifier.getApiAddress(), + statusNodeIdentifier.getApiPort()); + } }
http://git-wip-us.apache.org/repos/asf/nifi/blob/572dfed7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupEndpointMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupEndpointMerger.java index e130e5e..e32c253 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupEndpointMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupEndpointMerger.java @@ -31,6 +31,7 @@ import java.util.regex.Pattern; public class RemoteProcessGroupEndpointMerger extends AbstractSingleEntityEndpoint<RemoteProcessGroupEntity> implements EndpointResponseMerger { public static final Pattern REMOTE_PROCESS_GROUPS_URI_PATTERN = Pattern.compile("/nifi-api/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/remote-process-groups"); public static final Pattern REMOTE_PROCESS_GROUP_URI_PATTERN = Pattern.compile("/nifi-api/remote-process-groups/[a-f0-9\\-]{36}"); + private final RemoteProcessGroupEntityMerger remoteProcessGroupEntityMerger = new RemoteProcessGroupEntityMerger(); @Override public boolean canHandle(final URI uri, final String method) { @@ -52,6 +53,6 @@ public class RemoteProcessGroupEndpointMerger extends AbstractSingleEntityEndpoi protected void mergeResponses(RemoteProcessGroupEntity clientEntity, Map<NodeIdentifier, RemoteProcessGroupEntity> entityMap, Set<NodeResponse> successfulResponses, Set<NodeResponse> problematicResponses) { - RemoteProcessGroupEntityMerger.mergeRemoteProcessGroups(clientEntity, entityMap); + remoteProcessGroupEntityMerger.merge(clientEntity, entityMap); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/572dfed7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupStatusEndpointMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupStatusEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupStatusEndpointMerger.java index d2056f7..3f49c59 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupStatusEndpointMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupStatusEndpointMerger.java @@ -17,18 +17,21 @@ package org.apache.nifi.cluster.coordination.http.endpoints; -import java.net.URI; -import java.util.ArrayList; -import java.util.Map; -import java.util.regex.Pattern; - +import org.apache.nifi.cluster.manager.ComponentEntityStatusMerger; +import org.apache.nifi.cluster.manager.NodeResponse; import org.apache.nifi.cluster.manager.StatusMerger; import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.web.api.dto.status.NodeRemoteProcessGroupStatusSnapshotDTO; import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO; import org.apache.nifi.web.api.entity.RemoteProcessGroupStatusEntity; -public class RemoteProcessGroupStatusEndpointMerger extends AbstractNodeStatusEndpoint<RemoteProcessGroupStatusEntity, RemoteProcessGroupStatusDTO> { +import java.net.URI; +import java.util.ArrayList; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; + +public class RemoteProcessGroupStatusEndpointMerger extends AbstractSingleEntityEndpoint<RemoteProcessGroupStatusEntity> implements ComponentEntityStatusMerger<RemoteProcessGroupStatusDTO> { public static final Pattern REMOTE_PROCESS_GROUP_STATUS_URI_PATTERN = Pattern.compile("/nifi-api/flow/remote-process-groups/[a-f0-9\\-]{36}/status"); @Override @@ -42,17 +45,19 @@ public class RemoteProcessGroupStatusEndpointMerger extends AbstractNodeStatusEn } @Override - protected RemoteProcessGroupStatusDTO getDto(RemoteProcessGroupStatusEntity entity) { - return entity.getRemoteProcessGroupStatus(); - } + protected void mergeResponses(RemoteProcessGroupStatusEntity clientEntity, Map<NodeIdentifier, RemoteProcessGroupStatusEntity> entityMap, Set<NodeResponse> successfulResponses, + Set<NodeResponse> problematicResponses) { + final RemoteProcessGroupStatusDTO mergedRemoteProcessGroupStatus = clientEntity.getRemoteProcessGroupStatus(); + mergedRemoteProcessGroupStatus.setNodeSnapshots(new ArrayList<>()); - @Override - protected void mergeResponses(RemoteProcessGroupStatusDTO clientDto, Map<NodeIdentifier, RemoteProcessGroupStatusDTO> dtoMap, NodeIdentifier selectedNodeId) { - final RemoteProcessGroupStatusDTO mergedRemoteProcessGroupStatus = clientDto; - mergedRemoteProcessGroupStatus.setNodeSnapshots(new ArrayList<NodeRemoteProcessGroupStatusSnapshotDTO>()); + final NodeIdentifier selectedNodeId = entityMap.entrySet().stream() + .filter(e -> e.getValue() == clientEntity) + .map(e -> e.getKey()) + .findFirst() + .orElse(null); final NodeRemoteProcessGroupStatusSnapshotDTO selectedNodeSnapshot = new NodeRemoteProcessGroupStatusSnapshotDTO(); - selectedNodeSnapshot.setStatusSnapshot(clientDto.getAggregateSnapshot().clone()); + selectedNodeSnapshot.setStatusSnapshot(mergedRemoteProcessGroupStatus.getAggregateSnapshot().clone()); selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress()); selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort()); selectedNodeSnapshot.setNodeId(selectedNodeId.getId()); @@ -60,15 +65,22 @@ public class RemoteProcessGroupStatusEndpointMerger extends AbstractNodeStatusEn mergedRemoteProcessGroupStatus.getNodeSnapshots().add(selectedNodeSnapshot); // merge the other nodes - for (final Map.Entry<NodeIdentifier, RemoteProcessGroupStatusDTO> entry : dtoMap.entrySet()) { + for (final Map.Entry<NodeIdentifier, RemoteProcessGroupStatusEntity> entry : entityMap.entrySet()) { final NodeIdentifier nodeId = entry.getKey(); - final RemoteProcessGroupStatusDTO nodeRemoteProcessGroupStatus = entry.getValue(); - if (nodeRemoteProcessGroupStatus == clientDto) { + final RemoteProcessGroupStatusEntity nodeRemoteProcessGroupStatusEntity = entry.getValue(); + final RemoteProcessGroupStatusDTO nodeRemoteProcessGroupStatus = nodeRemoteProcessGroupStatusEntity.getRemoteProcessGroupStatus(); + if (nodeRemoteProcessGroupStatus == mergedRemoteProcessGroupStatus) { continue; } - StatusMerger.merge(mergedRemoteProcessGroupStatus, nodeRemoteProcessGroupStatus, nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort()); + mergeStatus(mergedRemoteProcessGroupStatus, clientEntity.getCanRead(), nodeRemoteProcessGroupStatus, nodeRemoteProcessGroupStatusEntity.getCanRead(), nodeId); } } + @Override + public void mergeStatus(RemoteProcessGroupStatusDTO clientStatus, boolean clientStatusReadablePermission, RemoteProcessGroupStatusDTO status, boolean statusReadablePermission, + NodeIdentifier statusNodeIdentifier) { + StatusMerger.merge(clientStatus, clientStatusReadablePermission, status, statusReadablePermission, statusNodeIdentifier.getId(), statusNodeIdentifier.getApiAddress(), + statusNodeIdentifier.getApiPort()); + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/572dfed7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ReportingTaskEndpointMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ReportingTaskEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ReportingTaskEndpointMerger.java index b3a7ccc..d17459d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ReportingTaskEndpointMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ReportingTaskEndpointMerger.java @@ -31,6 +31,7 @@ import java.util.regex.Pattern; public class ReportingTaskEndpointMerger extends AbstractSingleEntityEndpoint<ReportingTaskEntity> implements EndpointResponseMerger { public static final String REPORTING_TASKS_URI = "/nifi-api/controller/reporting-tasks"; public static final Pattern REPORTING_TASK_URI_PATTERN = Pattern.compile("/nifi-api/reporting-tasks/[a-f0-9\\-]{36}"); + private final ReportingTaskEntityMerger reportingTaskEntityMerger = new ReportingTaskEntityMerger(); @Override public boolean canHandle(URI uri, String method) { @@ -50,6 +51,6 @@ public class ReportingTaskEndpointMerger extends AbstractSingleEntityEndpoint<R @Override protected void mergeResponses(ReportingTaskEntity clientEntity, Map<NodeIdentifier, ReportingTaskEntity> entityMap, Set<NodeResponse> successfulResponses, Set<NodeResponse> problematicResponses) { - ReportingTaskEntityMerger.mergeReportingTasks(clientEntity, entityMap); + reportingTaskEntityMerger.merge(clientEntity, entityMap); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/572dfed7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java index 8d782e9..507c1fb 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java @@ -21,6 +21,7 @@ import java.net.URI; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -111,10 +112,15 @@ public class StatusHistoryEndpointMerger implements EndpointResponseMerger { StatusHistoryDTO lastStatusHistory = null; final List<NodeStatusSnapshotsDTO> nodeStatusSnapshots = new ArrayList<>(successfulResponses.size()); + LinkedHashMap<String, String> noReadPermissionsComponentDetails = null; for (final NodeResponse nodeResponse : successfulResponses) { final StatusHistoryEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(StatusHistoryEntity.class); final StatusHistoryDTO nodeStatus = nodeResponseEntity.getStatusHistory(); lastStatusHistory = nodeStatus; + if (noReadPermissionsComponentDetails == null && !nodeResponseEntity.getCanRead()) { + // If component details from a history with no read permissions is encountered for the first time, hold on to them to be used in the merged response + noReadPermissionsComponentDetails = nodeStatus.getComponentDetails(); + } final NodeIdentifier nodeId = nodeResponse.getNodeId(); final NodeStatusSnapshotsDTO nodeStatusSnapshot = new NodeStatusSnapshotsDTO(); @@ -130,12 +136,13 @@ public class StatusHistoryEndpointMerger implements EndpointResponseMerger { clusterStatusHistory.setGenerated(new Date()); clusterStatusHistory.setNodeSnapshots(nodeStatusSnapshots); if (lastStatusHistory != null) { - clusterStatusHistory.setComponentDetails(lastStatusHistory.getComponentDetails()); + clusterStatusHistory.setComponentDetails(noReadPermissionsComponentDetails == null ? lastStatusHistory.getComponentDetails() : noReadPermissionsComponentDetails); clusterStatusHistory.setFieldDescriptors(lastStatusHistory.getFieldDescriptors()); } final StatusHistoryEntity clusterEntity = new StatusHistoryEntity(); clusterEntity.setStatusHistory(clusterStatusHistory); + clusterEntity.setCanRead(noReadPermissionsComponentDetails == null); return new NodeResponse(clientResponse, clusterEntity); } http://git-wip-us.apache.org/repos/asf/nifi/blob/572dfed7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ComponentEntityMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ComponentEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ComponentEntityMerger.java index 580ba87..f01c1be 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ComponentEntityMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ComponentEntityMerger.java @@ -19,6 +19,7 @@ package org.apache.nifi.cluster.manager; import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.web.api.dto.BulletinDTO; import org.apache.nifi.web.api.entity.ComponentEntity; +import org.apache.nifi.web.api.entity.Permissible; import java.util.ArrayList; import java.util.Collections; @@ -29,35 +30,57 @@ import java.util.Map; import static org.apache.nifi.cluster.manager.BulletinMerger.BULLETIN_COMPARATOR; import static org.apache.nifi.reporting.BulletinRepository.MAX_BULLETINS_PER_COMPONENT; -public class ComponentEntityMerger { +public interface ComponentEntityMerger<EntityType extends ComponentEntity & Permissible> { /** - * Merges the ComponentEntity responses. + * Merges the ComponentEntity responses according to their {@link org.apache.nifi.web.api.dto.PermissionsDTO}s. Responsible for invoking + * {@link ComponentEntityMerger#mergeComponents(EntityType, Map)}. * * @param clientEntity the entity being returned to the client - * @param entityMap all node responses + * @param entityMap all node responses */ - public static void mergeComponents(final ComponentEntity clientEntity, final Map<NodeIdentifier, ? extends ComponentEntity> entityMap) { - final Map<NodeIdentifier, List<BulletinDTO>> bulletinDtos = new HashMap<>(); - for (final Map.Entry<NodeIdentifier, ? extends ComponentEntity> entry : entityMap.entrySet()) { - final NodeIdentifier nodeIdentifier = entry.getKey(); - final ComponentEntity entity = entry.getValue(); - - // consider the bulletins if present and authorized - if (entity.getBulletins() != null) { - entity.getBulletins().forEach(bulletin -> { - bulletinDtos.computeIfAbsent(nodeIdentifier, nodeId -> new ArrayList<>()).add(bulletin); - }); - } + @SuppressWarnings("unchecked") + default void merge(final EntityType clientEntity, final Map<NodeIdentifier, EntityType> entityMap) { + for (final Map.Entry<NodeIdentifier, EntityType> entry : entityMap.entrySet()) { + final EntityType entity = entry.getValue(); + PermissionsDtoMerger.mergePermissions(clientEntity.getPermissions(), entity.getPermissions()); } - clientEntity.setBulletins(BulletinMerger.mergeBulletins(bulletinDtos)); - // sort the results - Collections.sort(clientEntity.getBulletins(), BULLETIN_COMPARATOR); + if (clientEntity.getPermissions().getCanRead()) { + final Map<NodeIdentifier, List<BulletinDTO>> bulletinDtos = new HashMap<>(); + for (final Map.Entry<NodeIdentifier, ? extends ComponentEntity> entry : entityMap.entrySet()) { + final NodeIdentifier nodeIdentifier = entry.getKey(); + final ComponentEntity entity = entry.getValue(); + + // consider the bulletins if present and authorized + if (entity.getBulletins() != null) { + entity.getBulletins().forEach(bulletin -> { + bulletinDtos.computeIfAbsent(nodeIdentifier, nodeId -> new ArrayList<>()).add(bulletin); + }); + } + } + clientEntity.setBulletins(BulletinMerger.mergeBulletins(bulletinDtos)); + + // sort the results + Collections.sort(clientEntity.getBulletins(), BULLETIN_COMPARATOR); + + // prune the response to only include the max number of bulletins + if (clientEntity.getBulletins().size() > MAX_BULLETINS_PER_COMPONENT) { + clientEntity.setBulletins(clientEntity.getBulletins().subList(0, MAX_BULLETINS_PER_COMPONENT)); + } - // prune the response to only include the max number of bulletins - if (clientEntity.getBulletins().size() > MAX_BULLETINS_PER_COMPONENT) { - clientEntity.setBulletins(clientEntity.getBulletins().subList(0, MAX_BULLETINS_PER_COMPONENT)); + mergeComponents(clientEntity, entityMap); + } else { + clientEntity.setBulletins(null); + clientEntity.setComponent(null); // unchecked warning suppressed } } + + /** + * Performs the merging of the entities. This method should not be called directly, it will be called by {@link ComponentEntityMerger#merge}. + * @param clientEntity the entity being returned to the client + * @param entityMap all node responses + */ + default void mergeComponents(final EntityType clientEntity, final Map<NodeIdentifier, EntityType> entityMap) { + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/572dfed7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ComponentEntityStatusMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ComponentEntityStatusMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ComponentEntityStatusMerger.java new file mode 100644 index 0000000..0ed1e8a --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ComponentEntityStatusMerger.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager; + +import org.apache.nifi.cluster.protocol.NodeIdentifier; + +public interface ComponentEntityStatusMerger<StatusDtoType> { + + /** + * Merges status into clientStatus based on the given permissions. + * + * @param clientStatus The status that will be returned to the client after merging + * @param clientStatusReadablePermission The read permission of the status that will be returned to the client after merging + * @param status The status to be merged into the client status + * @param statusReadablePermission The read permission of the status to be merged into the client status + * @param statusNodeIdentifier The {@link NodeIdentifier} of the node from which status was received + */ + void mergeStatus(final StatusDtoType clientStatus, final boolean clientStatusReadablePermission, final StatusDtoType status, + final boolean statusReadablePermission, final NodeIdentifier statusNodeIdentifier); + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/572dfed7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectionEntityMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectionEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectionEntityMerger.java index 75dea4c..89ac179 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectionEntityMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectionEntityMerger.java @@ -17,25 +17,29 @@ package org.apache.nifi.cluster.manager; import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO; import org.apache.nifi.web.api.entity.ConnectionEntity; import java.util.Map; -public class ConnectionEntityMerger { +public class ConnectionEntityMerger implements ComponentEntityMerger<ConnectionEntity>, ComponentEntityStatusMerger<ConnectionStatusDTO> { - /** - * Merges the ConnectionEntity responses. - * - * @param clientEntity the entity being returned to the client - * @param entityMap all node responses - */ - public static void mergeConnections(final ConnectionEntity clientEntity, final Map<NodeIdentifier, ConnectionEntity> entityMap) { - for (final Map.Entry<NodeIdentifier, ConnectionEntity> entry : entityMap.entrySet()) { + @Override + public void merge(ConnectionEntity clientEntity, Map<NodeIdentifier, ConnectionEntity> entityMap) { + ComponentEntityMerger.super.merge(clientEntity, entityMap); + for (Map.Entry<NodeIdentifier, ConnectionEntity> entry : entityMap.entrySet()) { final NodeIdentifier nodeId = entry.getKey(); - final ConnectionEntity entity = entry.getValue(); - if (entity != clientEntity) { - StatusMerger.merge(clientEntity.getStatus(), entity.getStatus(), nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort()); + final ConnectionEntity entityStatus = entry.getValue(); + if (entityStatus != clientEntity) { + mergeStatus(clientEntity.getStatus(), clientEntity.getPermissions().getCanRead(), entry.getValue().getStatus(), entry.getValue().getPermissions().getCanRead(), entry.getKey()); } } } + + @Override + public void mergeStatus(ConnectionStatusDTO clientStatus, boolean clientStatusReadablePermission, ConnectionStatusDTO status, boolean statusReadablePermission, + NodeIdentifier statusNodeIdentifier) { + StatusMerger.merge(clientStatus, clientStatusReadablePermission, status, statusReadablePermission, statusNodeIdentifier.getId(), statusNodeIdentifier.getApiAddress(), + statusNodeIdentifier.getApiPort()); + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/572dfed7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectionsEntityMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectionsEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectionsEntityMerger.java index 42a9350..dafa3af 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectionsEntityMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectionsEntityMerger.java @@ -24,6 +24,8 @@ import java.util.Set; public class ConnectionsEntityMerger { + private static final ConnectionEntityMerger connectionEntityMerger = new ConnectionEntityMerger(); + /** * Merges multiple ConnectionEntity responses. * @@ -32,7 +34,7 @@ public class ConnectionsEntityMerger { */ public static void mergeConnections(final Set<ConnectionEntity> connectionEntities, final Map<String, Map<NodeIdentifier, ConnectionEntity>> entityMap) { for (final ConnectionEntity entity : connectionEntities) { - ConnectionEntityMerger.mergeConnections(entity, entityMap.get(entity.getId())); + connectionEntityMerger.merge(entity, entityMap.get(entity.getId())); } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/572dfed7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ControllerServiceEntityMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ControllerServiceEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ControllerServiceEntityMerger.java index 8c9f0c8..02d8e7e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ControllerServiceEntityMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ControllerServiceEntityMerger.java @@ -20,6 +20,7 @@ import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.controller.service.ControllerServiceState; import org.apache.nifi.web.api.dto.ControllerServiceDTO; import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO; +import org.apache.nifi.web.api.dto.PermissionsDTO; import org.apache.nifi.web.api.entity.ControllerServiceEntity; import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentEntity; @@ -27,15 +28,16 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; -public class ControllerServiceEntityMerger { +public class ControllerServiceEntityMerger implements ComponentEntityMerger<ControllerServiceEntity> { /** * Merges the ControllerServiceEntity responses. * * @param clientEntity the entity being returned to the client - * @param entityMap all node responses + * @param entityMap all node responses */ - public static void mergeControllerServices(final ControllerServiceEntity clientEntity, final Map<NodeIdentifier, ControllerServiceEntity> entityMap) { + @Override + public void mergeComponents(final ControllerServiceEntity clientEntity, final Map<NodeIdentifier, ControllerServiceEntity> entityMap) { final ControllerServiceDTO clientDto = clientEntity.getComponent(); final Map<NodeIdentifier, ControllerServiceDTO> dtoMap = new HashMap<>(); for (final Map.Entry<NodeIdentifier, ControllerServiceEntity> entry : entityMap.entrySet()) { @@ -44,8 +46,6 @@ public class ControllerServiceEntityMerger { dtoMap.put(entry.getKey(), nodeControllerServiceDto); } - ComponentEntityMerger.mergeComponents(clientEntity, entityMap); - mergeDtos(clientDto, dtoMap); } @@ -99,6 +99,7 @@ public class ControllerServiceEntityMerger { final Map<String, Integer> activeThreadCounts = new HashMap<>(); final Map<String, String> states = new HashMap<>(); + final Map<String, PermissionsDTO> canReads = new HashMap<>(); for (final Map.Entry<NodeIdentifier, Set<ControllerServiceReferencingComponentEntity>> nodeEntry : referencingComponentMap.entrySet()) { final Set<ControllerServiceReferencingComponentEntity> nodeReferencingComponents = nodeEntry.getValue(); @@ -126,6 +127,17 @@ public class ControllerServiceEntityMerger { states.put(nodeReferencingComponent.getId(), ControllerServiceState.ENABLING.name()); } } + + // handle read permissions + final PermissionsDTO mergedPermissions = canReads.get(nodeReferencingComponent.getId()); + final PermissionsDTO permissions = nodeReferencingComponentEntity.getPermissions(); + if (permissions != null) { + if (mergedPermissions == null) { + canReads.put(nodeReferencingComponent.getId(), permissions); + } else { + PermissionsDtoMerger.mergePermissions(mergedPermissions, permissions); + } + } } } } @@ -133,14 +145,20 @@ public class ControllerServiceEntityMerger { // go through each referencing components if (referencingComponents != null) { for (final ControllerServiceReferencingComponentEntity referencingComponent : referencingComponents) { - final Integer activeThreadCount = activeThreadCounts.get(referencingComponent.getId()); - if (activeThreadCount != null) { - referencingComponent.getComponent().setActiveThreadCount(activeThreadCount); - } + final PermissionsDTO permissions = canReads.get(referencingComponent.getId()); + if (permissions != null && permissions.getCanRead() != null && permissions.getCanRead()) { + final Integer activeThreadCount = activeThreadCounts.get(referencingComponent.getId()); + if (activeThreadCount != null) { + referencingComponent.getComponent().setActiveThreadCount(activeThreadCount); + } - final String state = states.get(referencingComponent.getId()); - if (state != null) { - referencingComponent.getComponent().setState(state); + final String state = states.get(referencingComponent.getId()); + if (state != null) { + referencingComponent.getComponent().setState(state); + } + } else { + referencingComponent.setPermissions(permissions); + referencingComponent.setComponent(null); } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/572dfed7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ControllerServicesEntityMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ControllerServicesEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ControllerServicesEntityMerger.java index ca97af6..cfaabf7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ControllerServicesEntityMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ControllerServicesEntityMerger.java @@ -24,6 +24,8 @@ import java.util.Set; public class ControllerServicesEntityMerger { + private static final ControllerServiceEntityMerger controllerServiceEntityMerger = new ControllerServiceEntityMerger(); + /** * Merges multiple ControllerServiceEntity responses. * @@ -32,7 +34,7 @@ public class ControllerServicesEntityMerger { */ public static void mergeControllerServices(final Set<ControllerServiceEntity> controllerServiceEntities, final Map<String, Map<NodeIdentifier, ControllerServiceEntity>> entityMap) { for (final ControllerServiceEntity entity : controllerServiceEntities) { - ControllerServiceEntityMerger.mergeControllerServices(entity, entityMap.get(entity.getId())); + controllerServiceEntityMerger.merge(entity, entityMap.get(entity.getId())); } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/572dfed7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/FunnelEntityMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/FunnelEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/FunnelEntityMerger.java new file mode 100644 index 0000000..51402cb --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/FunnelEntityMerger.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager; + +import org.apache.nifi.web.api.entity.FunnelEntity; + +public class FunnelEntityMerger implements ComponentEntityMerger<FunnelEntity> { + // No specific merging needs to be done beyond what the ComponentEntityMerger does by default +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/572dfed7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/FunnelsEntityMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/FunnelsEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/FunnelsEntityMerger.java new file mode 100644 index 0000000..2b5db3d --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/FunnelsEntityMerger.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager; + +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.entity.FunnelEntity; + +import java.util.Map; +import java.util.Set; + +public class FunnelsEntityMerger { + private static final FunnelEntityMerger funnelEntityMerger = new FunnelEntityMerger(); + + /** + * Merges multiple {@link FunnelEntity} responses. + * + * @param funnelEntities entities being returned to the client + * @param entityMap all node responses + */ + public static void mergeFunnels(final Set<FunnelEntity> funnelEntities, final Map<String, Map<NodeIdentifier, FunnelEntity>> entityMap) { + for (final FunnelEntity entity : funnelEntities) { + funnelEntityMerger.merge(entity, entityMap.get(entity.getId())); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/572dfed7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/LabelEntityMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/LabelEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/LabelEntityMerger.java new file mode 100644 index 0000000..aae4726 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/LabelEntityMerger.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager; + +import org.apache.nifi.web.api.entity.LabelEntity; + +public class LabelEntityMerger implements ComponentEntityMerger<LabelEntity> { + // No specific merging needs to be done beyond what the ComponentEntityMerger does by default +} http://git-wip-us.apache.org/repos/asf/nifi/blob/572dfed7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/LabelsEntityMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/LabelsEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/LabelsEntityMerger.java new file mode 100644 index 0000000..4015d20 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/LabelsEntityMerger.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager; + +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.entity.LabelEntity; + +import java.util.Map; +import java.util.Set; + +public class LabelsEntityMerger { + private static final LabelEntityMerger labelEntityMerger = new LabelEntityMerger(); + + /** + * Merges multiple {@link LabelEntity} responses. + * + * @param labelEntities entities being returned to the client + * @param entityMap all node responses + */ + public static void mergeLabels(final Set<LabelEntity> labelEntities, final Map<String, Map<NodeIdentifier, LabelEntity>> entityMap) { + for (final LabelEntity entity : labelEntities) { + labelEntityMerger.merge(entity, entityMap.get(entity.getId())); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/572dfed7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PermissionsDtoMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PermissionsDtoMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PermissionsDtoMerger.java new file mode 100644 index 0000000..688a594 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PermissionsDtoMerger.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager; + +import org.apache.nifi.web.api.dto.PermissionsDTO; + +public class PermissionsDtoMerger { + + /** + * Merges multiple {@link PermissionsDTO}s. + * + * @param mergedEntityPermissions the {@link PermissionsDTO} representing the merged permissions + * @param entityPermissions an {@link PermissionsDTO} to be merged + */ + public static void mergePermissions(PermissionsDTO mergedEntityPermissions, PermissionsDTO entityPermissions) { + if (mergedEntityPermissions.getCanRead() && !entityPermissions.getCanRead()) { + mergedEntityPermissions.setCanRead(false); + } + if (mergedEntityPermissions.getCanWrite() && !entityPermissions.getCanWrite()) { + mergedEntityPermissions.setCanWrite(false); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/572dfed7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PortEntityMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PortEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PortEntityMerger.java index 37c922f..cd73084 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PortEntityMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PortEntityMerger.java @@ -18,13 +18,26 @@ package org.apache.nifi.cluster.manager; import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.web.api.dto.PortDTO; +import org.apache.nifi.web.api.dto.status.PortStatusDTO; import org.apache.nifi.web.api.entity.PortEntity; import java.util.HashMap; import java.util.Map; import java.util.Set; -public class PortEntityMerger { +public class PortEntityMerger implements ComponentEntityMerger<PortEntity>, ComponentEntityStatusMerger<PortStatusDTO> { + + @Override + public void merge(PortEntity clientEntity, Map<NodeIdentifier, PortEntity> entityMap) { + ComponentEntityMerger.super.merge(clientEntity, entityMap); + for (Map.Entry<NodeIdentifier, PortEntity> entry : entityMap.entrySet()) { + final NodeIdentifier nodeId = entry.getKey(); + final PortEntity entityStatus = entry.getValue(); + if (entityStatus != clientEntity) { + mergeStatus(clientEntity.getStatus(), clientEntity.getPermissions().getCanRead(), entry.getValue().getStatus(), entry.getValue().getPermissions().getCanRead(), entry.getKey()); + } + } + } /** * Merges the PortEntity responses. @@ -32,7 +45,8 @@ public class PortEntityMerger { * @param clientEntity the entity being returned to the client * @param entityMap all node responses */ - public static void mergePorts(final PortEntity clientEntity, final Map<NodeIdentifier, PortEntity> entityMap) { + @Override + public void mergeComponents(PortEntity clientEntity, Map<NodeIdentifier, PortEntity> entityMap) { final PortDTO clientDto = clientEntity.getComponent(); final Map<NodeIdentifier, PortDTO> dtoMap = new HashMap<>(); for (final Map.Entry<NodeIdentifier, PortEntity> entry : entityMap.entrySet()) { @@ -41,19 +55,16 @@ public class PortEntityMerger { dtoMap.put(entry.getKey(), nodePortDto); } - for (final Map.Entry<NodeIdentifier, PortEntity> entry : entityMap.entrySet()) { - final NodeIdentifier nodeId = entry.getKey(); - final PortEntity entity = entry.getValue(); - if (entity != clientEntity) { - StatusMerger.merge(clientEntity.getStatus(), entity.getStatus(), nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort()); - } - } - - ComponentEntityMerger.mergeComponents(clientEntity, entityMap); - mergeDtos(clientDto, dtoMap); } + @Override + public void mergeStatus(PortStatusDTO clientStatus, boolean clientStatusReadablePermission, PortStatusDTO status, boolean statusReadablePermission, NodeIdentifier + statusNodeIdentifier) { + StatusMerger.merge(clientStatus, clientStatusReadablePermission, status, statusReadablePermission, statusNodeIdentifier.getId(), statusNodeIdentifier.getApiAddress(), + statusNodeIdentifier.getApiPort()); + } + private static void mergeDtos(final PortDTO clientDto, final Map<NodeIdentifier, PortDTO> dtoMap) { // if unauthorized for the client dto, simple return if (clientDto == null) { http://git-wip-us.apache.org/repos/asf/nifi/blob/572dfed7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PortsEntityMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PortsEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PortsEntityMerger.java index 894ce5b..a72132c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PortsEntityMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PortsEntityMerger.java @@ -24,6 +24,8 @@ import java.util.Set; public class PortsEntityMerger { + private static final PortEntityMerger portEntityMerger = new PortEntityMerger(); + /** * Merges multiple PortEntity responses. * @@ -32,7 +34,7 @@ public class PortsEntityMerger { */ public static void mergePorts(final Set<PortEntity> portEntities, final Map<String, Map<NodeIdentifier, PortEntity>> entityMap) { for (final PortEntity entity : portEntities) { - PortEntityMerger.mergePorts(entity, entityMap.get(entity.getId())); + portEntityMerger.merge(entity, entityMap.get(entity.getId())); } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/572dfed7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessGroupEntityMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessGroupEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessGroupEntityMerger.java index a9a1b40..67278a7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessGroupEntityMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessGroupEntityMerger.java @@ -17,27 +17,28 @@ package org.apache.nifi.cluster.manager; import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO; import org.apache.nifi.web.api.entity.ProcessGroupEntity; import java.util.Map; -public class ProcessGroupEntityMerger { +public class ProcessGroupEntityMerger implements ComponentEntityMerger<ProcessGroupEntity>, ComponentEntityStatusMerger<ProcessGroupStatusDTO> { - /** - * Merges the ProcessorGroupEntity responses. - * - * @param clientEntity the entity being returned to the client - * @param entityMap all node responses - */ - public static void mergeProcessGroups(final ProcessGroupEntity clientEntity, final Map<NodeIdentifier, ProcessGroupEntity> entityMap) { - for (final Map.Entry<NodeIdentifier, ProcessGroupEntity> entry : entityMap.entrySet()) { - final NodeIdentifier nodeId = entry.getKey(); - final ProcessGroupEntity entity = entry.getValue(); - if (entity != clientEntity) { - StatusMerger.merge(clientEntity.getStatus(), entity.getStatus(), nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort()); + @Override + public void merge(ProcessGroupEntity clientEntity, Map<NodeIdentifier, ProcessGroupEntity> entityMap) { + ComponentEntityMerger.super.merge(clientEntity, entityMap); + for (Map.Entry<NodeIdentifier, ProcessGroupEntity> entry : entityMap.entrySet()) { + final ProcessGroupEntity entityStatus = entry.getValue(); + if (entityStatus != clientEntity) { + mergeStatus(clientEntity.getStatus(), clientEntity.getPermissions().getCanRead(), entry.getValue().getStatus(), entry.getValue().getPermissions().getCanRead(), entry.getKey()); } } + } - ComponentEntityMerger.mergeComponents(clientEntity, entityMap); + @Override + public void mergeStatus(ProcessGroupStatusDTO clientStatus, boolean clientStatusReadablePermission, ProcessGroupStatusDTO status, boolean statusReadablePermission, + NodeIdentifier statusNodeIdentifier) { + StatusMerger.merge(clientStatus, clientStatusReadablePermission, status, statusReadablePermission, statusNodeIdentifier.getId(), statusNodeIdentifier.getApiAddress(), + statusNodeIdentifier.getApiPort()); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/572dfed7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessGroupsEntityMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessGroupsEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessGroupsEntityMerger.java index 6a0e519..2613487 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessGroupsEntityMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessGroupsEntityMerger.java @@ -24,6 +24,8 @@ import java.util.Set; public class ProcessGroupsEntityMerger { + private static final ProcessGroupEntityMerger processGroupEntityMerger = new ProcessGroupEntityMerger(); + /** * Merges multiple ProcessGroupEntity responses. * @@ -32,7 +34,7 @@ public class ProcessGroupsEntityMerger { */ public static void mergeProcessGroups(final Set<ProcessGroupEntity> processGroupEntities, final Map<String, Map<NodeIdentifier, ProcessGroupEntity>> entityMap) { for (final ProcessGroupEntity entity : processGroupEntities) { - ProcessGroupEntityMerger.mergeProcessGroups(entity, entityMap.get(entity.getId())); + processGroupEntityMerger.merge(entity, entityMap.get(entity.getId())); } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/572dfed7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorEntityMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorEntityMerger.java index e730791..f3dbf1b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorEntityMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorEntityMerger.java @@ -18,13 +18,25 @@ package org.apache.nifi.cluster.manager; import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.web.api.dto.ProcessorDTO; +import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO; import org.apache.nifi.web.api.entity.ProcessorEntity; import java.util.HashMap; import java.util.Map; import java.util.Set; -public class ProcessorEntityMerger { +public class ProcessorEntityMerger implements ComponentEntityMerger<ProcessorEntity>, ComponentEntityStatusMerger<ProcessorStatusDTO> { + @Override + public void merge(ProcessorEntity clientEntity, Map<NodeIdentifier, ProcessorEntity> entityMap) { + ComponentEntityMerger.super.merge(clientEntity, entityMap); + for (Map.Entry<NodeIdentifier, ProcessorEntity> entry : entityMap.entrySet()) { + final NodeIdentifier nodeId = entry.getKey(); + final ProcessorEntity entityStatus = entry.getValue(); + if (entityStatus != clientEntity) { + mergeStatus(clientEntity.getStatus(), clientEntity.getPermissions().getCanRead(), entry.getValue().getStatus(), entry.getValue().getPermissions().getCanRead(), entry.getKey()); + } + } + } /** * Merges the ProcessorEntity responses. @@ -32,7 +44,7 @@ public class ProcessorEntityMerger { * @param clientEntity the entity being returned to the client * @param entityMap all node responses */ - public static void mergeProcessors(final ProcessorEntity clientEntity, final Map<NodeIdentifier, ProcessorEntity> entityMap) { + public void mergeComponents(final ProcessorEntity clientEntity, final Map<NodeIdentifier, ProcessorEntity> entityMap) { final ProcessorDTO clientDto = clientEntity.getComponent(); final Map<NodeIdentifier, ProcessorDTO> dtoMap = new HashMap<>(); for (final Map.Entry<NodeIdentifier, ProcessorEntity> entry : entityMap.entrySet()) { @@ -41,19 +53,16 @@ public class ProcessorEntityMerger { dtoMap.put(entry.getKey(), nodeProcDto); } - for (final Map.Entry<NodeIdentifier, ProcessorEntity> entry : entityMap.entrySet()) { - final NodeIdentifier nodeId = entry.getKey(); - final ProcessorEntity entity = entry.getValue(); - if (entity != clientEntity) { - StatusMerger.merge(clientEntity.getStatus(), entity.getStatus(), nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort()); - } - } - - ComponentEntityMerger.mergeComponents(clientEntity, entityMap); - mergeDtos(clientDto, dtoMap); } + @Override + public void mergeStatus(ProcessorStatusDTO clientStatus, boolean clientStatusReadablePermission, ProcessorStatusDTO status, boolean statusReadablePermission, NodeIdentifier + statusNodeIdentifier) { + StatusMerger.merge(clientStatus, clientStatusReadablePermission, status, statusReadablePermission, statusNodeIdentifier.getId(), statusNodeIdentifier.getApiAddress(), + statusNodeIdentifier.getApiPort()); + } + private static void mergeDtos(final ProcessorDTO clientDto, final Map<NodeIdentifier, ProcessorDTO> dtoMap) { // if unauthorized for the client dto, simple return if (clientDto == null) { http://git-wip-us.apache.org/repos/asf/nifi/blob/572dfed7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorsEntityMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorsEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorsEntityMerger.java index cf4ef7d..21b4f7e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorsEntityMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorsEntityMerger.java @@ -24,6 +24,8 @@ import java.util.Set; public class ProcessorsEntityMerger { + private static final ProcessorEntityMerger processorEntityMerger = new ProcessorEntityMerger(); + /** * Merges multiple ProcessorEntity responses. * @@ -32,7 +34,7 @@ public class ProcessorsEntityMerger { */ public static void mergeProcessors(final Set<ProcessorEntity> processorEntities, final Map<String, Map<NodeIdentifier, ProcessorEntity>> entityMap) { for (final ProcessorEntity entity : processorEntities) { - ProcessorEntityMerger.mergeProcessors(entity, entityMap.get(entity.getId())); + processorEntityMerger.merge(entity, entityMap.get(entity.getId())); } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/572dfed7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/RemoteProcessGroupEntityMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/RemoteProcessGroupEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/RemoteProcessGroupEntityMerger.java index 2415285..3209e02 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/RemoteProcessGroupEntityMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/RemoteProcessGroupEntityMerger.java @@ -20,6 +20,7 @@ import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO; import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO; +import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO; import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity; import java.util.HashMap; @@ -27,7 +28,18 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; -public class RemoteProcessGroupEntityMerger { +public class RemoteProcessGroupEntityMerger implements ComponentEntityMerger<RemoteProcessGroupEntity>, ComponentEntityStatusMerger<RemoteProcessGroupStatusDTO> { + @Override + public void merge(RemoteProcessGroupEntity clientEntity, Map<NodeIdentifier, RemoteProcessGroupEntity> entityMap) { + ComponentEntityMerger.super.merge(clientEntity, entityMap); + for (Map.Entry<NodeIdentifier, RemoteProcessGroupEntity> entry : entityMap.entrySet()) { + final NodeIdentifier nodeId = entry.getKey(); + final RemoteProcessGroupEntity entityStatus = entry.getValue(); + if (entityStatus != clientEntity) { + mergeStatus(clientEntity.getStatus(), clientEntity.getPermissions().getCanRead(), entry.getValue().getStatus(), entry.getValue().getPermissions().getCanRead(), entry.getKey()); + } + } + } /** * Merges the RemoteProcessGroupEntity responses. @@ -35,7 +47,7 @@ public class RemoteProcessGroupEntityMerger { * @param clientEntity the entity being returned to the client * @param entityMap all node responses */ - public static void mergeRemoteProcessGroups(final RemoteProcessGroupEntity clientEntity, final Map<NodeIdentifier, RemoteProcessGroupEntity> entityMap) { + public void mergeComponents(final RemoteProcessGroupEntity clientEntity, final Map<NodeIdentifier, RemoteProcessGroupEntity> entityMap) { final RemoteProcessGroupDTO clientDto = clientEntity.getComponent(); final Map<NodeIdentifier, RemoteProcessGroupDTO> dtoMap = new HashMap<>(); for (final Map.Entry<NodeIdentifier, RemoteProcessGroupEntity> entry : entityMap.entrySet()) { @@ -44,19 +56,16 @@ public class RemoteProcessGroupEntityMerger { dtoMap.put(entry.getKey(), nodeProcDto); } - for (final Map.Entry<NodeIdentifier, RemoteProcessGroupEntity> entry : entityMap.entrySet()) { - final NodeIdentifier nodeId = entry.getKey(); - final RemoteProcessGroupEntity entity = entry.getValue(); - if (entity != clientEntity) { - StatusMerger.merge(clientEntity.getStatus(), entity.getStatus(), nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort()); - } - } - - ComponentEntityMerger.mergeComponents(clientEntity, entityMap); - mergeDtos(clientDto, dtoMap); } + @Override + public void mergeStatus(RemoteProcessGroupStatusDTO clientStatus, boolean clientStatusReadablePermission, RemoteProcessGroupStatusDTO status, + boolean statusReadablePermission, NodeIdentifier statusNodeIdentifier) { + StatusMerger.merge(clientStatus, clientStatusReadablePermission, status, statusReadablePermission, statusNodeIdentifier.getId(), statusNodeIdentifier.getApiAddress(), + statusNodeIdentifier.getApiPort()); + } + private static void mergeDtos(final RemoteProcessGroupDTO clientDto, final Map<NodeIdentifier, RemoteProcessGroupDTO> dtoMap) { // if unauthorized for the client dto, simple return if (clientDto == null) { http://git-wip-us.apache.org/repos/asf/nifi/blob/572dfed7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/RemoteProcessGroupsEntityMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/RemoteProcessGroupsEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/RemoteProcessGroupsEntityMerger.java index 8c94aa3..d7cf992 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/RemoteProcessGroupsEntityMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/RemoteProcessGroupsEntityMerger.java @@ -24,6 +24,8 @@ import java.util.Set; public class RemoteProcessGroupsEntityMerger { + private static final RemoteProcessGroupEntityMerger remoteProcessGroupEntityMerger = new RemoteProcessGroupEntityMerger(); + /** * Merges multiple RemoteProcessGroupEntity responses. * @@ -32,7 +34,7 @@ public class RemoteProcessGroupsEntityMerger { */ public static void mergeRemoteProcessGroups(final Set<RemoteProcessGroupEntity> remoteProcessGroupEntities, final Map<String, Map<NodeIdentifier, RemoteProcessGroupEntity>> entityMap) { for (final RemoteProcessGroupEntity entity : remoteProcessGroupEntities) { - RemoteProcessGroupEntityMerger.mergeRemoteProcessGroups(entity, entityMap.get(entity.getId())); + remoteProcessGroupEntityMerger.merge(entity, entityMap.get(entity.getId())); } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/572dfed7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ReportingTaskEntityMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ReportingTaskEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ReportingTaskEntityMerger.java index 8f0b281..552983d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ReportingTaskEntityMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ReportingTaskEntityMerger.java @@ -24,7 +24,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; -public class ReportingTaskEntityMerger { +public class ReportingTaskEntityMerger implements ComponentEntityMerger<ReportingTaskEntity> { /** * Merges the ReportingTaskEntity responses. @@ -32,7 +32,7 @@ public class ReportingTaskEntityMerger { * @param clientEntity the entity being returned to the client * @param entityMap all node responses */ - public static void mergeReportingTasks(final ReportingTaskEntity clientEntity, final Map<NodeIdentifier, ReportingTaskEntity> entityMap) { + public void mergeComponents(final ReportingTaskEntity clientEntity, final Map<NodeIdentifier, ReportingTaskEntity> entityMap) { final ReportingTaskDTO clientDto = clientEntity.getComponent(); final Map<NodeIdentifier, ReportingTaskDTO> dtoMap = new HashMap<>(); for (final Map.Entry<NodeIdentifier, ReportingTaskEntity> entry : entityMap.entrySet()) { @@ -41,8 +41,6 @@ public class ReportingTaskEntityMerger { dtoMap.put(entry.getKey(), nodeReportingTaskDto); } - ComponentEntityMerger.mergeComponents(clientEntity, entityMap); - mergeDtos(clientDto, dtoMap); } http://git-wip-us.apache.org/repos/asf/nifi/blob/572dfed7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ReportingTasksEntityMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ReportingTasksEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ReportingTasksEntityMerger.java index 764f4d0..516e32d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ReportingTasksEntityMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ReportingTasksEntityMerger.java @@ -24,6 +24,8 @@ import java.util.Set; public class ReportingTasksEntityMerger { + private static final ReportingTaskEntityMerger reportingTaskEntityMerger = new ReportingTaskEntityMerger(); + /** * Merges multiple ReportingTaskEntity responses. * @@ -32,7 +34,7 @@ public class ReportingTasksEntityMerger { */ public static void mergeReportingTasks(final Set<ReportingTaskEntity> reportingTaskEntities, final Map<String, Map<NodeIdentifier, ReportingTaskEntity>> entityMap) { for (final ReportingTaskEntity entity : reportingTaskEntities) { - ReportingTaskEntityMerger.mergeReportingTasks(entity, entityMap.get(entity.getId())); + reportingTaskEntityMerger.merge(entity, entityMap.get(entity.getId())); } } }