http://git-wip-us.apache.org/repos/asf/nifi/blob/0b437e09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/RemoteProcessGroupEntityMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/RemoteProcessGroupEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/RemoteProcessGroupEntityMerger.java new file mode 100644 index 0000000..2415285 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/RemoteProcessGroupEntityMerger.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager; + +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO; +import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; +import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO; +import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +public class RemoteProcessGroupEntityMerger { + + /** + * Merges the RemoteProcessGroupEntity responses. + * + * @param clientEntity the entity being returned to the client + * @param entityMap all node responses + */ + public static void mergeRemoteProcessGroups(final RemoteProcessGroupEntity clientEntity, final Map<NodeIdentifier, RemoteProcessGroupEntity> entityMap) { + final RemoteProcessGroupDTO clientDto = clientEntity.getComponent(); + final Map<NodeIdentifier, RemoteProcessGroupDTO> dtoMap = new HashMap<>(); + for (final Map.Entry<NodeIdentifier, RemoteProcessGroupEntity> entry : entityMap.entrySet()) { + final RemoteProcessGroupEntity nodeProcEntity = entry.getValue(); + final RemoteProcessGroupDTO nodeProcDto = nodeProcEntity.getComponent(); + dtoMap.put(entry.getKey(), nodeProcDto); + } + + for (final Map.Entry<NodeIdentifier, RemoteProcessGroupEntity> entry : entityMap.entrySet()) { + final NodeIdentifier nodeId = entry.getKey(); + final RemoteProcessGroupEntity entity = entry.getValue(); + if (entity != clientEntity) { + StatusMerger.merge(clientEntity.getStatus(), entity.getStatus(), nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort()); + } + } + + ComponentEntityMerger.mergeComponents(clientEntity, entityMap); + + mergeDtos(clientDto, dtoMap); + } + + private static void mergeDtos(final RemoteProcessGroupDTO clientDto, final Map<NodeIdentifier, RemoteProcessGroupDTO> dtoMap) { + // if unauthorized for the client dto, simple return + if (clientDto == null) { + return; + } + + final RemoteProcessGroupContentsDTO remoteProcessGroupContents = clientDto.getContents(); + + final Map<String, Set<NodeIdentifier>> authorizationErrorMap = new HashMap<>(); + Boolean mergedIsTargetSecure = null; + final Set<RemoteProcessGroupPortDTO> mergedInputPorts = new HashSet<>(); + final Set<RemoteProcessGroupPortDTO> mergedOutputPorts = new HashSet<>(); + + for (final Map.Entry<NodeIdentifier, RemoteProcessGroupDTO> nodeEntry : dtoMap.entrySet()) { + final RemoteProcessGroupDTO nodeRemoteProcessGroup = nodeEntry.getValue(); + + // consider the node remote process group when authorized + if (nodeRemoteProcessGroup != null) { + final NodeIdentifier nodeId = nodeEntry.getKey(); + + // merge the authorization errors + ErrorMerger.mergeErrors(authorizationErrorMap, nodeId, nodeRemoteProcessGroup.getAuthorizationIssues()); + + // use the first target secure flag since they will all be the same + final Boolean nodeIsTargetSecure = nodeRemoteProcessGroup.isTargetSecure(); + if (mergedIsTargetSecure == null) { + mergedIsTargetSecure = nodeIsTargetSecure; + } + + // merge the ports in the contents + final RemoteProcessGroupContentsDTO nodeRemoteProcessGroupContentsDto = nodeRemoteProcessGroup.getContents(); + if (remoteProcessGroupContents != null && nodeRemoteProcessGroupContentsDto != null) { + if (nodeRemoteProcessGroupContentsDto.getInputPorts() != null) { + mergedInputPorts.addAll(nodeRemoteProcessGroupContentsDto.getInputPorts()); + } + if (nodeRemoteProcessGroupContentsDto.getOutputPorts() != null) { + mergedOutputPorts.addAll(nodeRemoteProcessGroupContentsDto.getOutputPorts()); + } + } + } + + } + + if (remoteProcessGroupContents != null) { + if (!mergedInputPorts.isEmpty()) { + remoteProcessGroupContents.setInputPorts(mergedInputPorts); + } + if (!mergedOutputPorts.isEmpty()) { + remoteProcessGroupContents.setOutputPorts(mergedOutputPorts); + } + } + + if (mergedIsTargetSecure != null) { + clientDto.setTargetSecure(mergedIsTargetSecure); + } + + // set the merged the validation errors + clientDto.setAuthorizationIssues(ErrorMerger.normalizedMergedErrors(authorizationErrorMap, dtoMap.size())); + } +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0b437e09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/RemoteProcessGroupsEntityMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/RemoteProcessGroupsEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/RemoteProcessGroupsEntityMerger.java new file mode 100644 index 0000000..8c94aa3 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/RemoteProcessGroupsEntityMerger.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager; + +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity; + +import java.util.Map; +import java.util.Set; + +public class RemoteProcessGroupsEntityMerger { + + /** + * Merges multiple RemoteProcessGroupEntity responses. + * + * @param remoteProcessGroupEntities entities being returned to the client + * @param entityMap all node responses + */ + public static void mergeRemoteProcessGroups(final Set<RemoteProcessGroupEntity> remoteProcessGroupEntities, final Map<String, Map<NodeIdentifier, RemoteProcessGroupEntity>> entityMap) { + for (final RemoteProcessGroupEntity entity : remoteProcessGroupEntities) { + RemoteProcessGroupEntityMerger.mergeRemoteProcessGroups(entity, entityMap.get(entity.getId())); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0b437e09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ReportingTaskEntityMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ReportingTaskEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ReportingTaskEntityMerger.java new file mode 100644 index 0000000..8f0b281 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ReportingTaskEntityMerger.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager; + +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.dto.ReportingTaskDTO; +import org.apache.nifi.web.api.entity.ReportingTaskEntity; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +public class ReportingTaskEntityMerger { + + /** + * Merges the ReportingTaskEntity responses. + * + * @param clientEntity the entity being returned to the client + * @param entityMap all node responses + */ + public static void mergeReportingTasks(final ReportingTaskEntity clientEntity, final Map<NodeIdentifier, ReportingTaskEntity> entityMap) { + final ReportingTaskDTO clientDto = clientEntity.getComponent(); + final Map<NodeIdentifier, ReportingTaskDTO> dtoMap = new HashMap<>(); + for (final Map.Entry<NodeIdentifier, ReportingTaskEntity> entry : entityMap.entrySet()) { + final ReportingTaskEntity nodeReportingTaskEntity = entry.getValue(); + final ReportingTaskDTO nodeReportingTaskDto = nodeReportingTaskEntity.getComponent(); + dtoMap.put(entry.getKey(), nodeReportingTaskDto); + } + + ComponentEntityMerger.mergeComponents(clientEntity, entityMap); + + mergeDtos(clientDto, dtoMap); + } + + private static void mergeDtos(final ReportingTaskDTO clientDto, final Map<NodeIdentifier, ReportingTaskDTO> dtoMap) { + // if unauthorized for the client dto, simple return + if (clientDto == null) { + return; + } + + final Map<String, Set<NodeIdentifier>> validationErrorMap = new HashMap<>(); + + int activeThreadCount = 0; + for (final Map.Entry<NodeIdentifier, ReportingTaskDTO> nodeEntry : dtoMap.entrySet()) { + final ReportingTaskDTO nodeReportingTask = nodeEntry.getValue(); + + // consider the node reporting task if authorized + if (nodeReportingTask != null) { + final NodeIdentifier nodeId = nodeEntry.getKey(); + + if (nodeReportingTask.getActiveThreadCount() != null) { + activeThreadCount += nodeReportingTask.getActiveThreadCount(); + } + + // merge the validation errors + ErrorMerger.mergeErrors(validationErrorMap, nodeId, nodeReportingTask.getValidationErrors()); + } + } + + // set the merged active thread counts + clientDto.setActiveThreadCount(activeThreadCount); + + // set the merged the validation errors + clientDto.setValidationErrors(ErrorMerger.normalizedMergedErrors(validationErrorMap, dtoMap.size())); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0b437e09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ReportingTasksEntityMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ReportingTasksEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ReportingTasksEntityMerger.java new file mode 100644 index 0000000..764f4d0 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ReportingTasksEntityMerger.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager; + +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.entity.ReportingTaskEntity; + +import java.util.Map; +import java.util.Set; + +public class ReportingTasksEntityMerger { + + /** + * Merges multiple ReportingTaskEntity responses. + * + * @param reportingTaskEntities entities being returned to the client + * @param entityMap all node responses + */ + public static void mergeReportingTasks(final Set<ReportingTaskEntity> reportingTaskEntities, final Map<String, Map<NodeIdentifier, ReportingTaskEntity>> entityMap) { + for (final ReportingTaskEntity entity : reportingTaskEntities) { + ReportingTaskEntityMerger.mergeReportingTasks(entity, entityMap.get(entity.getId())); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0b437e09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/TestProcessorEndpointMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/TestProcessorEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/TestProcessorEndpointMerger.java index f7890b8..e59f096 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/TestProcessorEndpointMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/TestProcessorEndpointMerger.java @@ -17,8 +17,9 @@ package org.apache.nifi.cluster.coordination.http.endpoints; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import org.apache.nifi.cluster.manager.ErrorMerger; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.junit.Test; import java.util.ArrayList; import java.util.HashMap; @@ -26,8 +27,8 @@ import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.nifi.cluster.protocol.NodeIdentifier; -import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class TestProcessorEndpointMerger { @@ -41,13 +42,13 @@ public class TestProcessorEndpointMerger { nodeValidationErrors1234.add("error 1"); nodeValidationErrors1234.add("error 2"); - merger.mergeValidationErrors(validationErrorMap, nodeId1234, nodeValidationErrors1234); + ErrorMerger.mergeErrors(validationErrorMap, nodeId1234, nodeValidationErrors1234); final NodeIdentifier nodeXyz = new NodeIdentifier("xyz", "localhost", 8000, "localhost", 8001, "localhost", 8002, 8003, false); final List<String> nodeValidationErrorsXyz = new ArrayList<>(); nodeValidationErrorsXyz.add("error 1"); - merger.mergeValidationErrors(validationErrorMap, nodeXyz, nodeValidationErrorsXyz); + ErrorMerger.mergeErrors(validationErrorMap, nodeXyz, nodeValidationErrorsXyz); assertEquals(2, validationErrorMap.size()); http://git-wip-us.apache.org/repos/asf/nifi/blob/0b437e09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 54beeb0..f4e70af 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -16,40 +16,7 @@ */ package org.apache.nifi.controller; -import static java.util.Objects.requireNonNull; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.text.DateFormat; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.HashSet; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.LockSupport; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import javax.net.ssl.SSLContext; - +import com.sun.jersey.api.client.ClientHandlerException; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.action.Action; import org.apache.nifi.admin.service.AuditService; @@ -231,7 +198,37 @@ import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.sun.jersey.api.client.ClientHandlerException; +import javax.net.ssl.SSLContext; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.LockSupport; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import static java.util.Objects.requireNonNull; public class FlowController implements EventAccess, ControllerServiceProvider, ReportingTaskProvider, QueueProvider, Authorizable { @@ -2484,10 +2481,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R final int flowFilesOutOrRemoved = flowFilesTransferred + flowFilesRemoved; status.setAverageLineageDuration(flowFilesOutOrRemoved == 0 ? 0 : lineageMillis / flowFilesOutOrRemoved, TimeUnit.MILLISECONDS); - if (remoteGroup.getAuthorizationIssue() != null) { - status.setAuthorizationIssues(Arrays.asList(remoteGroup.getAuthorizationIssue())); - } - return status; } http://git-wip-us.apache.org/repos/asf/nifi/blob/0b437e09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java index 7202546..6ab5458 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java @@ -16,13 +16,6 @@ */ package org.apache.nifi.events; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; - import org.apache.nifi.reporting.Bulletin; import org.apache.nifi.reporting.BulletinQuery; import org.apache.nifi.reporting.BulletinRepository; @@ -30,6 +23,13 @@ import org.apache.nifi.reporting.ComponentType; import org.apache.nifi.util.RingBuffer; import org.apache.nifi.util.RingBuffer.Filter; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; + public class VolatileBulletinRepository implements BulletinRepository { private static final int CONTROLLER_BUFFER_SIZE = 10; @@ -136,6 +136,11 @@ public class VolatileBulletinRepository implements BulletinRepository { } @Override + public List<Bulletin> findBulletinsForSource(String sourceId) { + return findBulletins(new BulletinQuery.Builder().sourceIdMatches(sourceId).limit(COMPONENT_BUFFER_SIZE).build()); + } + + @Override public List<Bulletin> findBulletinsForGroupBySource(String groupId) { return findBulletinsForGroupBySource(groupId, COMPONENT_BUFFER_SIZE); } http://git-wip-us.apache.org/repos/asf/nifi/blob/0b437e09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index 864c06e..25ffed1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -206,6 +206,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // revision manager private RevisionManager revisionManager; + private BulletinRepository bulletinRepository; // data access objects private ProcessorDAO processorDAO; @@ -633,7 +634,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(processorNode); final ProcessorStatusDTO status = dtoFactory.createProcessorStatusDto(controllerFacade.getProcessorStatus(processorNode.getIdentifier())); - return new UpdateResult<>(entityFactory.createProcessorEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy, status), false); + final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processorNode.getIdentifier())); + return new UpdateResult<>(entityFactory.createProcessorEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy, status, bulletins), false); } @Override @@ -772,7 +774,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(inputPortNode); final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getInputPortStatus(inputPortNode.getIdentifier())); - return new UpdateResult<>(entityFactory.createPortEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy, status), false); + final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(inputPortNode.getIdentifier())); + return new UpdateResult<>(entityFactory.createPortEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy, status, bulletins), false); } @Override @@ -790,7 +793,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(outputPortNode); final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getOutputPortStatus(outputPortNode.getIdentifier())); - return new UpdateResult<>(entityFactory.createPortEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy, status), false); + final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(outputPortNode.getIdentifier())); + return new UpdateResult<>(entityFactory.createPortEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy, status, bulletins), false); } @Override @@ -810,7 +814,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(remoteProcessGroupNode); final RevisionDTO updateRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification()); final RemoteProcessGroupStatusDTO status = dtoFactory.createRemoteProcessGroupStatusDto(controllerFacade.getRemoteProcessGroupStatus(remoteProcessGroupNode.getIdentifier())); - return new UpdateResult<>(entityFactory.createRemoteProcessGroupEntity(snapshot.getComponent(), updateRevision, accessPolicy, status), false); + final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(remoteProcessGroupNode.getIdentifier())); + return new UpdateResult<>(entityFactory.createRemoteProcessGroupEntity(snapshot.getComponent(), updateRevision, accessPolicy, status, bulletins), false); } @Override @@ -865,7 +870,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(processGroupNode); final RevisionDTO updatedRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification()); final ProcessGroupStatusDTO status = dtoFactory.createConciseProcessGroupStatusDto(controllerFacade.getProcessGroupStatus(processGroupNode.getIdentifier())); - return new UpdateResult<>(entityFactory.createProcessGroupEntity(snapshot.getComponent(), updatedRevision, accessPolicy, status), false); + final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processGroupNode.getIdentifier())); + return new UpdateResult<>(entityFactory.createProcessGroupEntity(snapshot.getComponent(), updatedRevision, accessPolicy, status, bulletins), false); } @Override @@ -1047,7 +1053,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { () -> processorDAO.deleteProcessor(processorId), dtoFactory.createProcessorDto(processor)); - return entityFactory.createProcessorEntity(snapshot, null, null, null); + return entityFactory.createProcessorEntity(snapshot, null, null, null, null); } @Override @@ -1146,7 +1152,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { () -> inputPortDAO.deletePort(inputPortId), dtoFactory.createPortDto(port)); - return entityFactory.createPortEntity(snapshot, null, null, null); + return entityFactory.createPortEntity(snapshot, null, null, null, null); } @Override @@ -1158,7 +1164,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { () -> outputPortDAO.deletePort(outputPortId), dtoFactory.createPortDto(port)); - return entityFactory.createPortEntity(snapshot, null, null, null); + return entityFactory.createPortEntity(snapshot, null, null, null, null); } @Override @@ -1170,7 +1176,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { () -> processGroupDAO.deleteProcessGroup(groupId), dtoFactory.createProcessGroupDto(processGroup)); - return entityFactory.createProcessGroupEntity(snapshot, null, null, null); + return entityFactory.createProcessGroupEntity(snapshot, null, null, null, null); } @Override @@ -1182,7 +1188,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { () -> remoteProcessGroupDAO.deleteRemoteProcessGroup(remoteProcessGroupId), dtoFactory.createRemoteProcessGroupDto(remoteProcessGroup)); - return entityFactory.createRemoteProcessGroupEntity(snapshot, null, null, null); + return entityFactory.createRemoteProcessGroupEntity(snapshot, null, null, null, null); } @Override @@ -1240,7 +1246,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final ProcessorNode processor = processorDAO.getProcessor(processorDTO.getId()); final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(processor); final ProcessorStatusDTO status = dtoFactory.createProcessorStatusDto(controllerFacade.getProcessorStatus(processorDTO.getId())); - return entityFactory.createProcessorEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy, status); + final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processorDTO.getId())); + return entityFactory.createProcessorEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy, status, bulletins); } @Override @@ -1406,7 +1413,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final Port port = inputPortDAO.getPort(inputPortDTO.getId()); final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(port); final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getInputPortStatus(port.getIdentifier())); - return entityFactory.createPortEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy, status); + final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(port.getIdentifier())); + return entityFactory.createPortEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy, status, bulletins); } @Override @@ -1419,7 +1427,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final Port port = outputPortDAO.getPort(outputPortDTO.getId()); final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(port); final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getOutputPortStatus(port.getIdentifier())); - return entityFactory.createPortEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy, status); + final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(port.getIdentifier())); + return entityFactory.createPortEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy, status, bulletins); } @Override @@ -1432,7 +1441,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final ProcessGroup processGroup = processGroupDAO.getProcessGroup(processGroupDTO.getId()); final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(processGroup); final ProcessGroupStatusDTO status = dtoFactory.createConciseProcessGroupStatusDto(controllerFacade.getProcessGroupStatus(processGroup.getIdentifier())); - return entityFactory.createProcessGroupEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy, status); + final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processGroup.getIdentifier())); + return entityFactory.createProcessGroupEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy, status, bulletins); } @Override @@ -1445,7 +1455,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupDTO.getId()); final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(remoteProcessGroup); final RemoteProcessGroupStatusDTO status = dtoFactory.createRemoteProcessGroupStatusDto(controllerFacade.getRemoteProcessGroupStatus(remoteProcessGroup.getIdentifier())); - return entityFactory.createRemoteProcessGroupEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy, status); + final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(remoteProcessGroup.getIdentifier())); + return entityFactory.createRemoteProcessGroupEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy, status, bulletins); } @Override @@ -1601,7 +1612,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(processor); final FlowModification lastMod = new FlowModification(incrementRevision(revision), modifier); final ProcessorStatusDTO status = dtoFactory.createProcessorStatusDto(controllerFacade.getProcessorStatus(processor.getIdentifier())); - final ProcessorEntity entity = entityFactory.createProcessorEntity(updatedProcDto, dtoFactory.createRevisionDTO(lastMod), accessPolicy, status); + final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processor.getIdentifier())); + final ProcessorEntity entity = entityFactory.createProcessorEntity(updatedProcDto, dtoFactory.createRevisionDTO(lastMod), accessPolicy, status, bulletins); return new StandardRevisionUpdate<>(entity, lastMod); } @@ -1631,7 +1643,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(controllerServiceDTO.getId()); final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(controllerService); - return entityFactory.createControllerServiceEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy); + final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(controllerServiceDTO.getId())); + return entityFactory.createControllerServiceEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy, bulletins); } @Override @@ -1649,7 +1662,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { cs -> dtoFactory.createControllerServiceDto(cs)); final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(controllerService); - return new UpdateResult<>(entityFactory.createControllerServiceEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy), false); + final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(controllerServiceDTO.getId())); + return new UpdateResult<>(entityFactory.createControllerServiceEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy, bulletins), false); } @Override @@ -1811,7 +1825,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { () -> controllerServiceDAO.deleteControllerService(controllerServiceId), dtoFactory.createControllerServiceDto(controllerService)); - return entityFactory.createControllerServiceEntity(snapshot, null, null); + return entityFactory.createControllerServiceEntity(snapshot, null, null, null); } @@ -1837,7 +1851,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final ReportingTaskDTO dto = dtoFactory.createReportingTaskDto(reportingTask); final FlowModification lastMod = new FlowModification(new Revision(0L, rev.getClientId(), dto.getId()), modifier); final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(reportingTask); - return entityFactory.createReportingTaskEntity(dto, dtoFactory.createRevisionDTO(lastMod), accessPolicy); + final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(reportingTask.getIdentifier())); + return entityFactory.createReportingTaskEntity(dto, dtoFactory.createRevisionDTO(lastMod), accessPolicy, bulletins); }); } @@ -1856,7 +1871,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { rt -> dtoFactory.createReportingTaskDto(rt)); final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(reportingTask); - return new UpdateResult<>(entityFactory.createReportingTaskEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy), false); + final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(reportingTask.getIdentifier())); + return new UpdateResult<>(entityFactory.createReportingTaskEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy, bulletins), false); } @Override @@ -1868,7 +1884,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { () -> reportingTaskDAO.deleteReportingTask(reportingTaskId), dtoFactory.createReportingTaskDto(reportingTask)); - return entityFactory.createReportingTaskEntity(snapshot, null, null); + return entityFactory.createReportingTaskEntity(snapshot, null, null, null); } @Override @@ -2119,7 +2135,9 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(processor.getIdentifier())); final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(processor); final ProcessorStatusDTO status = dtoFactory.createProcessorStatusDto(controllerFacade.getProcessorStatus(processor.getIdentifier())); - return entityFactory.createProcessorEntity(dtoFactory.createProcessorDto(processor), revision, accessPolicy, status); + final List<BulletinDTO> bulletins = + dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processor.getIdentifier())); + return entityFactory.createProcessorEntity(dtoFactory.createProcessorDto(processor), revision, accessPolicy, status, bulletins); }) .collect(Collectors.toSet()); }); @@ -2177,7 +2195,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final RevisionDTO revision = dtoFactory.createRevisionDTO(rev); final ProcessorStatusDTO status = dtoFactory.createProcessorStatusDto(controllerFacade.getProcessorStatus(id)); - return entityFactory.createProcessorEntity(dtoFactory.createProcessorDto(processor), revision, dtoFactory.createAccessPolicyDto(processor), status); + final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(id)); + return entityFactory.createProcessorEntity(dtoFactory.createProcessorDto(processor), revision, dtoFactory.createAccessPolicyDto(processor), status, bulletins); }); } @@ -2215,9 +2234,6 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { .after(query.getAfter()) .limit(query.getLimit()); - // get the bulletin repository - final BulletinRepository bulletinRepository = controllerFacade.getBulletinRepository(); - // perform the query final List<Bulletin> results = bulletinRepository.findBulletins(queryBuilder.build()); @@ -2432,7 +2448,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(port.getIdentifier())); final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(port); final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getInputPortStatus(port.getIdentifier())); - return entityFactory.createPortEntity(dtoFactory.createPortDto(port), revision, accessPolicy, status); + final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(port.getIdentifier())); + return entityFactory.createPortEntity(dtoFactory.createPortDto(port), revision, accessPolicy, status, bulletins); }) .collect(Collectors.toSet()); }); @@ -2452,7 +2469,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(port.getIdentifier())); final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(port); final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getOutputPortStatus(port.getIdentifier())); - return entityFactory.createPortEntity(dtoFactory.createPortDto(port), revision, accessPolicy, status); + final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(port.getIdentifier())); + return entityFactory.createPortEntity(dtoFactory.createPortDto(port), revision, accessPolicy, status, bulletins); }) .collect(Collectors.toSet()); }); @@ -2471,7 +2489,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(group.getIdentifier())); final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(group); final ProcessGroupStatusDTO status = dtoFactory.createConciseProcessGroupStatusDto(controllerFacade.getProcessGroupStatus(group.getIdentifier())); - return entityFactory.createProcessGroupEntity(dtoFactory.createProcessGroupDto(group), revision, accessPolicy, status); + final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(group.getIdentifier())); + return entityFactory.createProcessGroupEntity(dtoFactory.createProcessGroupDto(group), revision, accessPolicy, status, bulletins); }) .collect(Collectors.toSet()); }); @@ -2491,7 +2510,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(rpg.getIdentifier())); final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(rpg); final RemoteProcessGroupStatusDTO status = dtoFactory.createRemoteProcessGroupStatusDto(controllerFacade.getRemoteProcessGroupStatus(rpg.getIdentifier())); - return entityFactory.createRemoteProcessGroupEntity(dtoFactory.createRemoteProcessGroupDto(rpg), revision, accessPolicy, status); + final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(rpg.getIdentifier())); + return entityFactory.createRemoteProcessGroupEntity(dtoFactory.createRemoteProcessGroupDto(rpg), revision, accessPolicy, status, bulletins); }) .collect(Collectors.toSet()); }); @@ -2506,7 +2526,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final RevisionDTO revision = dtoFactory.createRevisionDTO(rev); final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(port); final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getInputPortStatus(inputPortId)); - return entityFactory.createPortEntity(dtoFactory.createPortDto(port), revision, accessPolicy, status); + final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(inputPortId)); + return entityFactory.createPortEntity(dtoFactory.createPortDto(port), revision, accessPolicy, status, bulletins); }); } @@ -2524,7 +2545,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final RevisionDTO revision = dtoFactory.createRevisionDTO(rev); final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(port); final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getOutputPortStatus(outputPortId)); - return entityFactory.createPortEntity(dtoFactory.createPortDto(port), revision, accessPolicy, status); + final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(outputPortId)); + return entityFactory.createPortEntity(dtoFactory.createPortDto(port), revision, accessPolicy, status, bulletins); }); } @@ -2542,7 +2564,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final RevisionDTO revision = dtoFactory.createRevisionDTO(rev); final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(rpg); final RemoteProcessGroupStatusDTO status = dtoFactory.createRemoteProcessGroupStatusDto(controllerFacade.getRemoteProcessGroupStatus(rpg.getIdentifier())); - return entityFactory.createRemoteProcessGroupEntity(dtoFactory.createRemoteProcessGroupDto(rpg), revision, accessPolicy, status); + final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(rpg.getIdentifier())); + return entityFactory.createRemoteProcessGroupEntity(dtoFactory.createRemoteProcessGroupDto(rpg), revision, accessPolicy, status, bulletins); }); } @@ -2609,7 +2632,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final RevisionDTO revision = dtoFactory.createRevisionDTO(rev); final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(processGroup); final ProcessGroupStatusDTO status = dtoFactory.createConciseProcessGroupStatusDto(controllerFacade.getProcessGroupStatus(groupId)); - return entityFactory.createProcessGroupEntity(dtoFactory.createProcessGroupDto(processGroup), revision, accessPolicy, status); + final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(groupId)); + return entityFactory.createProcessGroupEntity(dtoFactory.createProcessGroupDto(processGroup), revision, accessPolicy, status, bulletins); }); } @@ -2641,7 +2665,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(serviceNode.getIdentifier())); final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(serviceNode); - return entityFactory.createControllerServiceEntity(dto, revision, accessPolicy); + final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(serviceNode.getIdentifier())); + return entityFactory.createControllerServiceEntity(dto, revision, accessPolicy, bulletins); }) .collect(Collectors.toSet()); }); @@ -2661,7 +2686,9 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final ControllerServiceReferencingComponentsEntity referencingComponentsEntity = createControllerServiceReferencingComponentsEntity(ref, Sets.newHashSet(controllerServiceId)); dto.setReferencingComponents(referencingComponentsEntity.getControllerServiceReferencingComponents()); - return entityFactory.createControllerServiceEntity(dto, revision, accessPolicy); + final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(controllerServiceId)); + + return entityFactory.createControllerServiceEntity(dto, revision, accessPolicy, bulletins); }); } @@ -2699,7 +2726,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { .map(reportingTask -> { final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(reportingTask.getIdentifier())); final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(reportingTask); - return entityFactory.createReportingTaskEntity(dtoFactory.createReportingTaskDto(reportingTask), revision, accessPolicy); + final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(reportingTask.getIdentifier())); + return entityFactory.createReportingTaskEntity(dtoFactory.createReportingTaskDto(reportingTask), revision, accessPolicy, bulletins); }) .collect(Collectors.toSet()); }); @@ -2713,7 +2741,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final RevisionDTO revision = dtoFactory.createRevisionDTO(rev); final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(reportingTask); - return entityFactory.createReportingTaskEntity(dtoFactory.createReportingTaskDto(reportingTask), revision, accessPolicy); + final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(reportingTaskId)); + return entityFactory.createReportingTaskEntity(dtoFactory.createReportingTaskDto(reportingTask), revision, accessPolicy, bulletins); }); } @@ -2964,4 +2993,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { public void setHeartbeatMonitor(HeartbeatMonitor heartbeatMonitor) { this.heartbeatMonitor = heartbeatMonitor; } + + public void setBulletinRepository(BulletinRepository bulletinRepository) { + this.bulletinRepository = bulletinRepository; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/0b437e09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index 6eb895b..6dd3ecc 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -98,6 +98,7 @@ import org.apache.nifi.provenance.lineage.ProvenanceEventLineageNode; import org.apache.nifi.remote.RemoteGroupPort; import org.apache.nifi.remote.RootGroupPort; import org.apache.nifi.reporting.Bulletin; +import org.apache.nifi.reporting.BulletinRepository; import org.apache.nifi.reporting.ReportingTask; import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.util.FormatUtils; @@ -174,6 +175,7 @@ public final class DtoFactory { }; public static final String SENSITIVE_VALUE_MASK = "********"; + private BulletinRepository bulletinRepository; private ControllerServiceProvider controllerServiceProvider; private EntityFactory entityFactory; private Authorizer authorizer; @@ -777,7 +779,6 @@ public final class DtoFactory { snapshot.setBytesSent(remoteProcessGroupStatus.getSentContentSize()); snapshot.setFlowFilesReceived(remoteProcessGroupStatus.getReceivedCount()); snapshot.setBytesReceived(remoteProcessGroupStatus.getReceivedContentSize()); - snapshot.setAuthorizationIssues(remoteProcessGroupStatus.getAuthorizationIssues()); StatusMerger.updatePrettyPrintedFields(snapshot); return dto; @@ -1535,7 +1536,8 @@ public final class DtoFactory { () -> groupStatus.getInputPortStatus().stream().filter(inputPortStatus -> inputPort.getId().equals(inputPortStatus.getId())).findFirst().orElse(null), inputPortStatus -> createPortStatusDto(inputPortStatus) ); - flow.getInputPorts().add(entityFactory.createPortEntity(inputPort, revision, accessPolicy, status)); + final List<BulletinDTO> bulletins = createBulletinDtos(bulletinRepository.findBulletinsForSource(inputPort.getId())); + flow.getInputPorts().add(entityFactory.createPortEntity(inputPort, revision, accessPolicy, status, bulletins)); } for (final PortDTO outputPort : snippet.getOutputPorts()) { @@ -1545,7 +1547,8 @@ public final class DtoFactory { () -> groupStatus.getOutputPortStatus().stream().filter(outputPortStatus -> outputPort.getId().equals(outputPortStatus.getId())).findFirst().orElse(null), outputPortStatus -> createPortStatusDto(outputPortStatus) ); - flow.getOutputPorts().add(entityFactory.createPortEntity(outputPort, revision, accessPolicy, status)); + final List<BulletinDTO> bulletins = createBulletinDtos(bulletinRepository.findBulletinsForSource(outputPort.getId())); + flow.getOutputPorts().add(entityFactory.createPortEntity(outputPort, revision, accessPolicy, status, bulletins)); } for (final LabelDTO label : snippet.getLabels()) { @@ -1561,7 +1564,8 @@ public final class DtoFactory { () -> groupStatus.getProcessGroupStatus().stream().filter(processGroupStatus -> processGroup.getId().equals(processGroupStatus.getId())).findFirst().orElse(null), processGroupStatus -> createConciseProcessGroupStatusDto(processGroupStatus) ); - flow.getProcessGroups().add(entityFactory.createProcessGroupEntity(processGroup, revision, accessPolicy, status)); + final List<BulletinDTO> bulletins = createBulletinDtos(bulletinRepository.findBulletinsForSource(processGroup.getId())); + flow.getProcessGroups().add(entityFactory.createProcessGroupEntity(processGroup, revision, accessPolicy, status, bulletins)); } for (final ProcessorDTO processor : snippet.getProcessors()) { @@ -1571,7 +1575,8 @@ public final class DtoFactory { () -> groupStatus.getProcessorStatus().stream().filter(processorStatus -> processor.getId().equals(processorStatus.getId())).findFirst().orElse(null), processorStatus -> createProcessorStatusDto(processorStatus) ); - flow.getProcessors().add(entityFactory.createProcessorEntity(processor, revision, accessPolicy, status)); + final List<BulletinDTO> bulletins = createBulletinDtos(bulletinRepository.findBulletinsForSource(processor.getId())); + flow.getProcessors().add(entityFactory.createProcessorEntity(processor, revision, accessPolicy, status, bulletins)); } for (final RemoteProcessGroupDTO remoteProcessGroup : snippet.getRemoteProcessGroups()) { @@ -1581,7 +1586,8 @@ public final class DtoFactory { () -> groupStatus.getRemoteProcessGroupStatus().stream().filter(rpgStatus -> remoteProcessGroup.getId().equals(rpgStatus.getId())).findFirst().orElse(null), remoteProcessGroupStatus -> createRemoteProcessGroupStatusDto(remoteProcessGroupStatus) ); - flow.getRemoteProcessGroups().add(entityFactory.createRemoteProcessGroupEntity(remoteProcessGroup, revision, accessPolicy, status)); + final List<BulletinDTO> bulletins = createBulletinDtos(bulletinRepository.findBulletinsForSource(remoteProcessGroup.getId())); + flow.getRemoteProcessGroups().add(entityFactory.createRemoteProcessGroupEntity(remoteProcessGroup, revision, accessPolicy, status, bulletins)); } return flow; @@ -1608,7 +1614,8 @@ public final class DtoFactory { () -> groupStatus.getProcessorStatus().stream().filter(processorStatus -> procNode.getIdentifier().equals(processorStatus.getId())).findFirst().orElse(null), processorStatus -> createProcessorStatusDto(processorStatus) ); - dto.getProcessors().add(entityFactory.createProcessorEntity(createProcessorDto(procNode), revision, accessPolicy, status)); + final List<BulletinDTO> bulletins = createBulletinDtos(bulletinRepository.findBulletinsForSource(procNode.getIdentifier())); + dto.getProcessors().add(entityFactory.createProcessorEntity(createProcessorDto(procNode), revision, accessPolicy, status, bulletins)); } for (final Connection connNode : group.getConnections()) { @@ -1640,7 +1647,8 @@ public final class DtoFactory { () -> groupStatus.getProcessGroupStatus().stream().filter(processGroupStatus -> childGroup.getIdentifier().equals(processGroupStatus.getId())).findFirst().orElse(null), processGroupStatus -> createConciseProcessGroupStatusDto(processGroupStatus) ); - dto.getProcessGroups().add(entityFactory.createProcessGroupEntity(createProcessGroupDto(childGroup), revision, accessPolicy, status)); + final List<BulletinDTO> bulletins = createBulletinDtos(bulletinRepository.findBulletinsForSource(childGroup.getIdentifier())); + dto.getProcessGroups().add(entityFactory.createProcessGroupEntity(createProcessGroupDto(childGroup), revision, accessPolicy, status, bulletins)); } for (final RemoteProcessGroup rpg : group.getRemoteProcessGroups()) { @@ -1650,7 +1658,8 @@ public final class DtoFactory { () -> groupStatus.getRemoteProcessGroupStatus().stream().filter(remoteProcessGroupStatus -> rpg.getIdentifier().equals(remoteProcessGroupStatus.getId())).findFirst().orElse(null), remoteProcessGroupStatus -> createRemoteProcessGroupStatusDto(remoteProcessGroupStatus) ); - dto.getRemoteProcessGroups().add(entityFactory.createRemoteProcessGroupEntity(createRemoteProcessGroupDto(rpg), revision, accessPolicy, status)); + final List<BulletinDTO> bulletins = createBulletinDtos(bulletinRepository.findBulletinsForSource(rpg.getIdentifier())); + dto.getRemoteProcessGroups().add(entityFactory.createRemoteProcessGroupEntity(createRemoteProcessGroupDto(rpg), revision, accessPolicy, status, bulletins)); } for (final Port inputPort : group.getInputPorts()) { @@ -1660,7 +1669,8 @@ public final class DtoFactory { () -> groupStatus.getInputPortStatus().stream().filter(inputPortStatus -> inputPort.getIdentifier().equals(inputPortStatus.getId())).findFirst().orElse(null), inputPortStatus -> createPortStatusDto(inputPortStatus) ); - dto.getInputPorts().add(entityFactory.createPortEntity(createPortDto(inputPort), revision, accessPolicy, status)); + final List<BulletinDTO> bulletins = createBulletinDtos(bulletinRepository.findBulletinsForSource(inputPort.getIdentifier())); + dto.getInputPorts().add(entityFactory.createPortEntity(createPortDto(inputPort), revision, accessPolicy, status, bulletins)); } for (final Port outputPort : group.getOutputPorts()) { @@ -1670,7 +1680,8 @@ public final class DtoFactory { () -> groupStatus.getOutputPortStatus().stream().filter(outputPortStatus -> outputPort.getIdentifier().equals(outputPortStatus.getId())).findFirst().orElse(null), outputPortStatus -> createPortStatusDto(outputPortStatus) ); - dto.getOutputPorts().add(entityFactory.createPortEntity(createPortDto(outputPort), revision, accessPolicy, status)); + final List<BulletinDTO> bulletins = createBulletinDtos(bulletinRepository.findBulletinsForSource(outputPort.getIdentifier())); + dto.getOutputPorts().add(entityFactory.createPortEntity(createPortDto(outputPort), revision, accessPolicy, status, bulletins)); } return dto; @@ -2868,4 +2879,8 @@ public final class DtoFactory { public void setEntityFactory(EntityFactory entityFactory) { this.entityFactory = entityFactory; } + + public void setBulletinRepository(BulletinRepository bulletinRepository) { + this.bulletinRepository = bulletinRepository; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/0b437e09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java index 6677301..e5c3d9c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java @@ -39,6 +39,8 @@ import org.apache.nifi.web.api.entity.RemoteProcessGroupPortEntity; import org.apache.nifi.web.api.entity.ReportingTaskEntity; import org.apache.nifi.web.api.entity.SnippetEntity; +import java.util.List; + public final class EntityFactory { public ControllerConfigurationEntity createControllerConfigurationEntity(final ControllerConfigurationDTO dto, final RevisionDTO revision, final AccessPolicyDTO accessPolicy) { @@ -61,7 +63,9 @@ public final class EntityFactory { return entity; } - public ProcessorEntity createProcessorEntity(final ProcessorDTO dto, final RevisionDTO revision, final AccessPolicyDTO accessPolicy, final ProcessorStatusDTO status) { + public ProcessorEntity createProcessorEntity(final ProcessorDTO dto, final RevisionDTO revision, final AccessPolicyDTO accessPolicy, + final ProcessorStatusDTO status, final List<BulletinDTO> bulletins) { + final ProcessorEntity entity = new ProcessorEntity(); entity.setRevision(revision); if (dto != null) { @@ -71,12 +75,13 @@ public final class EntityFactory { entity.setPosition(dto.getPosition()); if (accessPolicy != null && accessPolicy.getCanRead()) { entity.setComponent(dto); + entity.setBulletins(bulletins); } } return entity; } - public PortEntity createPortEntity(final PortDTO dto, final RevisionDTO revision, final AccessPolicyDTO accessPolicy, final PortStatusDTO status) { + public PortEntity createPortEntity(final PortDTO dto, final RevisionDTO revision, final AccessPolicyDTO accessPolicy, final PortStatusDTO status, final List<BulletinDTO> bulletins) { final PortEntity entity = new PortEntity(); entity.setRevision(revision); if (dto != null) { @@ -87,12 +92,15 @@ public final class EntityFactory { entity.setPortType(dto.getType()); if (accessPolicy != null && accessPolicy.getCanRead()) { entity.setComponent(dto); + entity.setBulletins(bulletins); } } return entity; } - public ProcessGroupEntity createProcessGroupEntity(final ProcessGroupDTO dto, final RevisionDTO revision, final AccessPolicyDTO accessPolicy, final ProcessGroupStatusDTO status) { + public ProcessGroupEntity createProcessGroupEntity(final ProcessGroupDTO dto, final RevisionDTO revision, final AccessPolicyDTO accessPolicy, + final ProcessGroupStatusDTO status, final List<BulletinDTO> bulletins) { + final ProcessGroupEntity entity = new ProcessGroupEntity(); entity.setRevision(revision); if (dto != null) { @@ -110,6 +118,7 @@ public final class EntityFactory { entity.setInactiveRemotePortCount(dto.getInactiveRemotePortCount()); if (accessPolicy != null && accessPolicy.getCanRead()) { entity.setComponent(dto); + entity.setBulletins(bulletins); } } return entity; @@ -170,8 +179,8 @@ public final class EntityFactory { return entity; } - public RemoteProcessGroupEntity createRemoteProcessGroupEntity( - final RemoteProcessGroupDTO dto, final RevisionDTO revision, final AccessPolicyDTO accessPolicy,final RemoteProcessGroupStatusDTO status) { + public RemoteProcessGroupEntity createRemoteProcessGroupEntity(final RemoteProcessGroupDTO dto, final RevisionDTO revision, final AccessPolicyDTO accessPolicy, + final RemoteProcessGroupStatusDTO status, final List<BulletinDTO> bulletins) { final RemoteProcessGroupEntity entity = new RemoteProcessGroupEntity(); entity.setRevision(revision); @@ -184,6 +193,7 @@ public final class EntityFactory { entity.setOutputPortCount(dto.getOutputPortCount()); if (accessPolicy != null && accessPolicy.getCanRead()) { entity.setComponent(dto); + entity.setBulletins(bulletins); } } return entity; @@ -209,7 +219,7 @@ public final class EntityFactory { return entity; } - public ReportingTaskEntity createReportingTaskEntity(final ReportingTaskDTO dto, final RevisionDTO revision, final AccessPolicyDTO accessPolicy) { + public ReportingTaskEntity createReportingTaskEntity(final ReportingTaskDTO dto, final RevisionDTO revision, final AccessPolicyDTO accessPolicy, final List<BulletinDTO> bulletins) { final ReportingTaskEntity entity = new ReportingTaskEntity(); entity.setRevision(revision); if (dto != null) { @@ -217,13 +227,14 @@ public final class EntityFactory { entity.setId(dto.getId()); if (accessPolicy != null && accessPolicy.getCanRead()) { entity.setComponent(dto); + entity.setBulletins(bulletins); } } return entity; } - public ControllerServiceEntity createControllerServiceEntity(final ControllerServiceDTO dto, final RevisionDTO revision, final AccessPolicyDTO accessPolicy) { + public ControllerServiceEntity createControllerServiceEntity(final ControllerServiceDTO dto, final RevisionDTO revision, final AccessPolicyDTO accessPolicy, final List<BulletinDTO> bulletins) { final ControllerServiceEntity entity = new ControllerServiceEntity(); entity.setRevision(revision); if (dto != null) { @@ -232,6 +243,7 @@ public final class EntityFactory { entity.setPosition(dto.getPosition()); if (accessPolicy != null && accessPolicy.getCanRead()) { entity.setComponent(dto); + entity.setBulletins(bulletins); } } return entity; http://git-wip-us.apache.org/repos/asf/nifi/blob/0b437e09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java index f234fde..37ef237 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java @@ -16,27 +16,6 @@ */ package org.apache.nifi.web.controller; -import java.io.IOException; -import java.io.InputStream; -import java.text.Collator; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Set; -import java.util.SortedSet; -import java.util.TimeZone; -import java.util.TreeSet; -import java.util.concurrent.TimeUnit; - -import javax.ws.rs.WebApplicationException; - import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.ClassUtils; import org.apache.commons.lang3.StringUtils; @@ -131,6 +110,26 @@ import org.apache.nifi.web.security.ProxiedEntitiesUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.ws.rs.WebApplicationException; +import java.io.IOException; +import java.io.InputStream; +import java.text.Collator; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; +import java.util.TimeZone; +import java.util.TreeSet; +import java.util.concurrent.TimeUnit; + public class ControllerFacade implements Authorizable { private static final Logger logger = LoggerFactory.getLogger(ControllerFacade.class); @@ -140,6 +139,7 @@ public class ControllerFacade implements Authorizable { private FlowService flowService; private KeyService keyService; private ClusterCoordinator clusterCoordinator; + private BulletinRepository bulletinRepository; // properties private NiFiProperties properties; @@ -479,7 +479,6 @@ public class ControllerFacade implements Authorizable { controllerStatus.setConnectedNodes(connectedNodeCount + " / " + totalNodeCount); } - final BulletinRepository bulletinRepository = getBulletinRepository(); controllerStatus.setBulletins(dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForController())); // get the controller service bulletins @@ -663,15 +662,6 @@ public class ControllerFacade implements Authorizable { } /** - * Gets the BulletinRepository. - * - * @return the BulletinRepository - */ - public BulletinRepository getBulletinRepository() { - return flowController.getBulletinRepository(); - } - - /** * Saves the state of the flow controller. * * @throws NiFiCoreException ex @@ -1706,4 +1696,8 @@ public class ControllerFacade implements Authorizable { public void setClusterCoordinator(ClusterCoordinator clusterCoordinator) { this.clusterCoordinator = clusterCoordinator; } + + public void setBulletinRepository(BulletinRepository bulletinRepository) { + this.bulletinRepository = bulletinRepository; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/0b437e09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml index 0fadf84..1ca34dc 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml @@ -52,6 +52,7 @@ <property name="controllerServiceProvider" ref="controllerServiceProvider" /> <property name="entityFactory" ref="entityFactory"/> <property name="authorizer" ref="authorizer"/> + <property name="bulletinRepository" ref="bulletinRepository"/> </bean> <!-- snippet utils --> @@ -113,6 +114,7 @@ <property name="clusterCoordinator" ref="clusterCoordinator" /> <property name="keyService" ref="keyService"/> <property name="dtoFactory" ref="dtoFactory"/> + <property name="bulletinRepository" ref="bulletinRepository"/> </bean> <bean id="serviceFacade" class="org.apache.nifi.web.StandardNiFiServiceFacade"> <property name="properties" ref="nifiProperties"/> @@ -138,6 +140,7 @@ <property name="entityFactory" ref="entityFactory"/> <property name="clusterCoordinator" ref="clusterCoordinator"/> <property name="heartbeatMonitor" ref="heartbeatMonitor" /> + <property name="bulletinRepository" ref="bulletinRepository"/> </bean> <!-- component ui extension configuration context --> http://git-wip-us.apache.org/repos/asf/nifi/blob/0b437e09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/flow-status.css ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/flow-status.css b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/flow-status.css index fcfd5a2..e8ee677 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/flow-status.css +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/flow-status.css @@ -92,6 +92,10 @@ background-color: #728E9B; /*base-color*/ } +#bulletin-button.has-bulletins { + background-color: #ba554a; /*warm-color*/ +} + #bulletin-button i.fa { color: #fff; font-size: 15px; http://git-wip-us.apache.org/repos/asf/nifi/blob/0b437e09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/controllers/nf-ng-canvas-flow-status-controller.js ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/controllers/nf-ng-canvas-flow-status-controller.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/controllers/nf-ng-canvas-flow-status-controller.js index 7937874..eda752f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/controllers/nf-ng-canvas-flow-status-controller.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/controllers/nf-ng-canvas-flow-status-controller.js @@ -287,7 +287,15 @@ nf.ng.Canvas.FlowStatusCtrl = function (serviceProvider, $sanitize) { // no bulletins before, show icon and tips bulletinIcon.addClass('has-bulletins').qtip($.extend({ content: newBulletins - }, nf.CanvasUtils.config.systemTooltipConfig)); + }, nf.CanvasUtils.config.systemTooltipConfig, { + position: { + at: 'bottom left', + my: 'top right', + adjust: { + x: 4 + } + } + })); } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/0b437e09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas-utils.js ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas-utils.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas-utils.js index bfa567d..6e37221 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas-utils.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas-utils.js @@ -110,10 +110,7 @@ nf.CanvasUtils = (function () { }, position: { at: 'bottom right', - my: 'top left', - adjust: { - method: 'flipinvert flipinvert' - } + my: 'top left' } } }, @@ -493,7 +490,7 @@ nf.CanvasUtils = (function () { } // if there are bulletins show them, otherwise hide - if (!nf.Common.isEmpty(d.status.bulletins)) { + if (!nf.Common.isEmpty(d.bulletins)) { // update the tooltip selection.select('text.bulletin-icon') .each(function () { @@ -505,7 +502,7 @@ nf.CanvasUtils = (function () { .attr('class', 'tooltip nifi-tooltip') .html(function () { // format the bulletins - var bulletins = nf.Common.getFormattedBulletins(d.status.bulletins); + var bulletins = nf.Common.getFormattedBulletins(d.bulletins); // create the unordered list based off the formatted bulletins var list = nf.Common.formatUnorderedList(bulletins); @@ -519,6 +516,12 @@ nf.CanvasUtils = (function () { // add the tooltip nf.CanvasUtils.canvasTooltip(tip, d3.select(this)); }); + + // update the tooltip background + selection.select('rect.bulletin-background').classed('has-bulletins', true); + } else { + // update the tooltip background + selection.select('rect.bulletin-background').classed('has-bulletins', false); } }, http://git-wip-us.apache.org/repos/asf/nifi/blob/0b437e09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-services.js ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-services.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-services.js index 78e9bca..d8e62b4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-services.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-services.js @@ -496,7 +496,7 @@ nf.ControllerServices = (function () { markup += '<img src="images/iconUsage.png" title="Usage" class="pointer controller-service-usage" style="margin-left: 6px; margin-top: 3px; float: left;" />'; var hasErrors = !nf.Common.isEmpty(dataContext.component.validationErrors); - var hasBulletins = !nf.Common.isEmpty(dataContext.component.bulletins); + var hasBulletins = !nf.Common.isEmpty(dataContext.bulletins); if (hasErrors) { markup += '<img src="images/iconAlert.png" class="has-errors" style="margin-top: 4px; margin-left: 3px; float: left;" />'; @@ -689,7 +689,7 @@ nf.ControllerServices = (function () { var controllerServiceEntity = controllerServicesData.getItemById(taskId); // format the tooltip - var bulletins = nf.Common.getFormattedBulletins(controllerServiceEntity.component.bulletins); + var bulletins = nf.Common.getFormattedBulletins(controllerServiceEntity.bulletins); var tooltip = nf.Common.formatUnorderedList(bulletins); // show the tooltip http://git-wip-us.apache.org/repos/asf/nifi/blob/0b437e09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-process-group.js ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-process-group.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-process-group.js index b6ba6ba..d4dbe37 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-process-group.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-process-group.js @@ -679,7 +679,7 @@ nf.ProcessGroup = (function () { 'x': function () { return processGroupData.dimensions.width - 17; }, - 'y': 50 + 'y': 49 }) .text('\uf24a'); } http://git-wip-us.apache.org/repos/asf/nifi/blob/0b437e09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-processor.js ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-processor.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-processor.js index 66b02cb..6f1a025 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-processor.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-processor.js @@ -467,9 +467,9 @@ nf.Processor = (function () { .attr({ 'class': 'bulletin-icon', 'x': function (d) { - return processorData.dimensions.width - 18; + return processorData.dimensions.width - 17; }, - 'y': 18 + 'y': 17 }) .text('\uf24a'); }