This is an automated email from the ASF dual-hosted git repository. markap14 pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new 3bb79f1ba6 NIFI-13476 Optimized Authorization Checking in Process Group Status (#9021) 3bb79f1ba6 is described below commit 3bb79f1ba65dc7bc4dc5cd96e0e806ba2f9ffe86 Author: David Handermann <exceptionfact...@apache.org> AuthorDate: Mon Jul 1 13:58:22 2024 -0500 NIFI-13476 Optimized Authorization Checking in Process Group Status (#9021) - Avoided invoking Authorizer when nested component is not included in Process Group Status based on depth of recursion --- .../apache/nifi/reporting/AbstractEventAccess.java | 25 ++- .../nifi/reporting/AbstractEventAccessTest.java | 231 +++++++++++++++++++++ 2 files changed, 249 insertions(+), 7 deletions(-) diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java index 9c22b407b7..373b21babc 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java @@ -72,6 +72,9 @@ import java.util.function.Predicate; public abstract class AbstractEventAccess implements EventAccess { private static final Logger logger = LoggerFactory.getLogger(AbstractEventAccess.class); + private static final Predicate<Authorizable> AUTHORIZATION_APPROVED = authorizable -> true; + private static final Predicate<Authorizable> AUTHORIZATION_DENIED = authorizable -> false; + private final ProcessScheduler processScheduler; private final StatusAnalyticsEngine statusAnalyticsEngine; private final FlowManager flowManager; @@ -96,7 +99,7 @@ public abstract class AbstractEventAccess implements EventAccess { public ProcessGroupStatus getGroupStatus(final String groupId) { final RepositoryStatusReport statusReport = generateRepositoryStatusReport(); final ProcessGroup group = flowManager.getGroup(groupId); - return getGroupStatus(group, statusReport, authorizable -> true, Integer.MAX_VALUE, 1, true); + return getGroupStatus(group, statusReport, AUTHORIZATION_APPROVED, Integer.MAX_VALUE, 1, true); } /** @@ -112,7 +115,7 @@ public abstract class AbstractEventAccess implements EventAccess { final ProcessGroup group = flowManager.getGroup(groupId); // this was invoked with no user context so the results will be unfiltered... necessary for aggregating status history - return getGroupStatus(group, statusReport, authorizable -> true, Integer.MAX_VALUE, 1, false); + return getGroupStatus(group, statusReport, AUTHORIZATION_APPROVED, Integer.MAX_VALUE, 1, false); } protected RepositoryStatusReport generateRepositoryStatusReport() { @@ -127,13 +130,13 @@ public abstract class AbstractEventAccess implements EventAccess { * * @param group group id * @param statusReport report - * @param isAuthorized is authorized check + * @param checkAuthorization is authorized check * @param recursiveStatusDepth the number of levels deep we should recurse and still include the the processors' statuses, the groups' statuses, etc. in the returned ProcessGroupStatus * @param currentDepth the current number of levels deep that we have recursed * @param includeConnectionDetails whether or not to include the details of the connections that may be expensive to calculate and/or require locks be obtained * @return the component status */ - ProcessGroupStatus getGroupStatus(final ProcessGroup group, final RepositoryStatusReport statusReport, final Predicate<Authorizable> isAuthorized, + ProcessGroupStatus getGroupStatus(final ProcessGroup group, final RepositoryStatusReport statusReport, final Predicate<Authorizable> checkAuthorization, final int recursiveStatusDepth, final int currentDepth, final boolean includeConnectionDetails) { if (group == null) { return null; @@ -141,7 +144,7 @@ public abstract class AbstractEventAccess implements EventAccess { final ProcessGroupStatus status = new ProcessGroupStatus(); status.setId(group.getIdentifier()); - status.setName(isAuthorized.test(group) ? group.getName() : group.getIdentifier()); + status.setName(checkAuthorization.test(group) ? group.getName() : group.getIdentifier()); int activeGroupThreads = 0; int terminatedGroupThreads = 0; long bytesRead = 0L; @@ -162,6 +165,14 @@ public abstract class AbstractEventAccess implements EventAccess { final boolean populateChildStatuses = currentDepth <= recursiveStatusDepth; + // Set Authorization predicate based on whether to populate child component status avoiding unnecessary calls to Authorizer + final Predicate<Authorizable> isAuthorized; + if (populateChildStatuses) { + isAuthorized = checkAuthorization; + } else { + isAuthorized = AUTHORIZATION_DENIED; + } + // set status for processors final Collection<ProcessorStatus> processorStatusCollection = new ArrayList<>(); status.setProcessorStatus(processorStatusCollection); @@ -196,7 +207,7 @@ public abstract class AbstractEventAccess implements EventAccess { // avoid performing any sort of authorizations. Because we only care about the numbers that come back, we can just indicate // that the user is not authorized. This allows us to avoid the expense of both performing the authorization and calculating // things that we would otherwise need to calculate if the user were in fact authorized. - childGroupStatus = getGroupStatus(childGroup, statusReport, authorizable -> false, recursiveStatusDepth, currentDepth + 1, includeConnectionDetails); + childGroupStatus = getGroupStatus(childGroup, statusReport, AUTHORIZATION_DENIED, recursiveStatusDepth, currentDepth + 1, includeConnectionDetails); } activeGroupThreads += childGroupStatus.getActiveThreadCount(); @@ -686,7 +697,7 @@ public abstract class AbstractEventAccess implements EventAccess { final ProcessGroup group = flowManager.getGroup(rootGroupId); final RepositoryStatusReport statusReport = generateRepositoryStatusReport(); - return getGroupStatus(group, statusReport, authorizable -> true, Integer.MAX_VALUE, 1, true); + return getGroupStatus(group, statusReport, AUTHORIZATION_APPROVED, Integer.MAX_VALUE, 1, true); } @Override diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/reporting/AbstractEventAccessTest.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/reporting/AbstractEventAccessTest.java new file mode 100644 index 0000000000..7661d1996c --- /dev/null +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/reporting/AbstractEventAccessTest.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.reporting; + +import org.apache.nifi.action.Action; +import org.apache.nifi.authorization.resource.Authorizable; +import org.apache.nifi.controller.ProcessScheduler; +import org.apache.nifi.controller.ProcessorNode; +import org.apache.nifi.controller.flow.FlowManager; +import org.apache.nifi.controller.repository.FlowFileEventRepository; +import org.apache.nifi.controller.repository.RepositoryStatusReport; +import org.apache.nifi.controller.repository.StandardRepositoryStatusReport; +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.controller.status.ProcessorStatus; +import org.apache.nifi.controller.status.analytics.StatusAnalyticsEngine; +import org.apache.nifi.diagnostics.StorageUsage; +import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.provenance.ProvenanceEventRepository; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.function.Predicate; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class AbstractEventAccessTest { + + private static final boolean INCLUDE_CONNECTION_DETAILS = true; + + private static final String PROCESS_GROUP_NAME = "Root Group"; + + private static final String PROCESS_GROUP_ID = UUID.randomUUID().toString(); + + private static final String PROCESSOR_NAME = "Event Processor"; + + private static final String PROCESSOR_ID = UUID.randomUUID().toString(); + + private static final int ZERO_DEPTH = 0; + + private static final int SINGLE_DEPTH = 1; + + @Mock + private ProcessScheduler processScheduler; + + @Mock + private StatusAnalyticsEngine statusAnalyticsEngine; + + @Mock + private FlowManager flowManager; + + @Mock + private FlowFileEventRepository flowFileEventRepository; + + @Mock + private ProcessGroup processGroup; + + @Mock + private ProcessorNode processorNode; + + private AbstractEventAccess eventAccess; + + @BeforeEach + void setEventAccess() { + eventAccess = new ConcreteEventAccess(processScheduler, statusAnalyticsEngine, flowManager, flowFileEventRepository); + } + + @Test + void testGetGroupStatusAuthorized() { + final List<Authorizable> authorizables = new ArrayList<>(); + + final RepositoryStatusReport repositoryStatusReport = new StandardRepositoryStatusReport(); + final Predicate<Authorizable> checkAuthorization = authorizables::add; + + when(processGroup.getName()).thenReturn(PROCESS_GROUP_NAME); + when(processGroup.getIdentifier()).thenReturn(PROCESS_GROUP_ID); + + final ProcessGroupStatus groupStatus = eventAccess.getGroupStatus(processGroup, repositoryStatusReport, checkAuthorization, SINGLE_DEPTH, SINGLE_DEPTH, INCLUDE_CONNECTION_DETAILS); + + assertNotNull(groupStatus); + assertEquals(PROCESS_GROUP_NAME, groupStatus.getName()); + assertEquals(PROCESS_GROUP_ID, groupStatus.getId()); + + assertEquals(1, authorizables.size()); + } + + @Test + void testGetGroupStatusProcessorAuthorizedNotIncluded() { + final List<Authorizable> authorizables = new ArrayList<>(); + + final RepositoryStatusReport repositoryStatusReport = new StandardRepositoryStatusReport(); + final Predicate<Authorizable> checkAuthorization = authorizables::add; + + when(processorNode.getIdentifier()).thenReturn(PROCESSOR_ID); + when(processorNode.getProcessGroup()).thenReturn(processGroup); + + when(processGroup.getProcessors()).thenReturn(List.of(processorNode)); + when(processGroup.getName()).thenReturn(PROCESS_GROUP_NAME); + when(processGroup.getIdentifier()).thenReturn(PROCESS_GROUP_ID); + + final ProcessGroupStatus groupStatus = eventAccess.getGroupStatus(processGroup, repositoryStatusReport, checkAuthorization, ZERO_DEPTH, SINGLE_DEPTH, INCLUDE_CONNECTION_DETAILS); + + assertNotNull(groupStatus); + assertEquals(PROCESS_GROUP_NAME, groupStatus.getName()); + assertEquals(PROCESS_GROUP_ID, groupStatus.getId()); + assertTrue(groupStatus.getProcessorStatus().isEmpty()); + + assertEquals(1, authorizables.size()); + } + + @Test + void testGetGroupStatusProcessorAuthorized() { + final List<Authorizable> authorizables = new ArrayList<>(); + + final RepositoryStatusReport repositoryStatusReport = new StandardRepositoryStatusReport(); + final Predicate<Authorizable> checkAuthorization = authorizables::add; + + when(processorNode.getName()).thenReturn(PROCESSOR_NAME); + when(processorNode.getIdentifier()).thenReturn(PROCESSOR_ID); + when(processorNode.getProcessGroup()).thenReturn(processGroup); + when(processGroup.getProcessors()).thenReturn(List.of(processorNode)); + when(processGroup.getName()).thenReturn(PROCESS_GROUP_NAME); + when(processGroup.getIdentifier()).thenReturn(PROCESS_GROUP_ID); + + final ProcessGroupStatus groupStatus = eventAccess.getGroupStatus(processGroup, repositoryStatusReport, checkAuthorization, SINGLE_DEPTH, SINGLE_DEPTH, INCLUDE_CONNECTION_DETAILS); + + assertNotNull(groupStatus); + assertEquals(PROCESS_GROUP_NAME, groupStatus.getName()); + assertEquals(PROCESS_GROUP_ID, groupStatus.getId()); + + final Optional<ProcessorStatus> processorStatusFound = groupStatus.getProcessorStatus().stream().findFirst(); + assertTrue(processorStatusFound.isPresent()); + final ProcessorStatus processorStatus = processorStatusFound.get(); + assertEquals(PROCESSOR_NAME, processorStatus.getName()); + assertEquals(PROCESSOR_ID, processorStatus.getId()); + + assertEquals(2, authorizables.size()); + } + + @Test + void testGetGroupStatusNotAuthorized() { + final List<Authorizable> authorizables = new ArrayList<>(); + + final RepositoryStatusReport repositoryStatusReport = new StandardRepositoryStatusReport(); + final Predicate<Authorizable> checkAuthorization = authorizable -> { + authorizables.add(authorizable); + return false; + }; + + when(processorNode.getIdentifier()).thenReturn(PROCESSOR_ID); + when(processorNode.getProcessGroup()).thenReturn(processGroup); + when(processGroup.getProcessors()).thenReturn(List.of(processorNode)); + when(processGroup.getIdentifier()).thenReturn(PROCESS_GROUP_ID); + + final ProcessGroupStatus groupStatus = eventAccess.getGroupStatus(processGroup, repositoryStatusReport, checkAuthorization, SINGLE_DEPTH, SINGLE_DEPTH, INCLUDE_CONNECTION_DETAILS); + + assertNotNull(groupStatus); + assertEquals(PROCESS_GROUP_ID, groupStatus.getName()); + assertEquals(PROCESS_GROUP_ID, groupStatus.getId()); + + final Optional<ProcessorStatus> processorStatusFound = groupStatus.getProcessorStatus().stream().findFirst(); + assertTrue(processorStatusFound.isPresent()); + final ProcessorStatus processorStatus = processorStatusFound.get(); + assertEquals(PROCESSOR_ID, processorStatus.getName()); + assertEquals(PROCESSOR_ID, processorStatus.getId()); + + assertEquals(2, authorizables.size()); + } + + private static class ConcreteEventAccess extends AbstractEventAccess { + + public ConcreteEventAccess( + final ProcessScheduler processScheduler, + final StatusAnalyticsEngine analyticsEngine, + final FlowManager flowManager, + final FlowFileEventRepository flowFileEventRepository + ) { + super(processScheduler, analyticsEngine, flowManager, flowFileEventRepository); + } + + @Override + public ProvenanceEventRepository getProvenanceRepository() { + return null; + } + + @Override + public List<Action> getFlowChanges(int firstActionId, int maxActions) { + return List.of(); + } + + @Override + public Map<String, StorageUsage> getProvenanceRepositoryStorageUsage() { + return Map.of(); + } + + @Override + public Map<String, StorageUsage> getContentRepositoryStorageUsage() { + return Map.of(); + } + + @Override + public StorageUsage getFlowFileRepositoryStorageUsage() { + return null; + } + } +}