This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 0218e76b18 NIFI-14688 Added Auditing for Processor and Port State 
changes at the Process Group level (#10233)
0218e76b18 is described below

commit 0218e76b1867514dba47cd1191b819146603c4df
Author: NissimShiman <[email protected]>
AuthorDate: Mon Sep 1 11:50:27 2025 -0400

    NIFI-14688 Added Auditing for Processor and Port State changes at the 
Process Group level (#10233)
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../org/apache/nifi/audit/ProcessGroupAuditor.java | 46 +++++++++++
 .../apache/nifi/audit/TestProcessGroupAuditor.java | 96 +++++++++++++++++++++-
 2 files changed, 141 insertions(+), 1 deletion(-)

diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ProcessGroupAuditor.java
 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ProcessGroupAuditor.java
index 981937a85a..33259dac39 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ProcessGroupAuditor.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ProcessGroupAuditor.java
@@ -24,6 +24,9 @@ import 
org.apache.nifi.action.component.details.FlowChangeExtensionDetails;
 import org.apache.nifi.action.details.ActionDetails;
 import org.apache.nifi.action.details.FlowChangeConfigureDetails;
 import org.apache.nifi.action.details.FlowChangeMoveDetails;
+import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.connectable.Port;
+import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.controller.service.ControllerServiceState;
@@ -246,6 +249,34 @@ public class ProcessGroupAuditor extends NiFiAuditor {
         }
 
         saveUpdateProcessGroupAction(groupId, operation);
+        saveActions(getComponentActions(groupId, componentIds, operation), 
logger);
+    }
+
+    private List<Action> getComponentActions(final String groupId, final 
Collection<String> componentIds, final Operation operation) {
+        final List<Action> actions = new ArrayList<>();
+        final ProcessGroupDAO processGroupDAO = getProcessGroupDAO();
+        final ProcessGroup processGroup = 
processGroupDAO.getProcessGroup(groupId);
+
+        for (String componentId : componentIds) {
+            final ProcessorNode processorNode = 
processGroup.findProcessor(componentId);
+            if (processorNode != null) {
+                actions.add(generateUpdateConnectableAction(processorNode, 
operation, Component.Processor));
+                continue;
+            }
+
+            Port port = processGroup.findInputPort(componentId);
+            if (port != null) {
+                actions.add(generateUpdateConnectableAction(port, operation, 
Component.InputPort));
+                continue;
+            }
+
+            port = processGroup.findOutputPort(componentId);
+            if (port != null) {
+                actions.add(generateUpdateConnectableAction(port, operation, 
Component.OutputPort));
+            }
+        }
+
+        return actions;
     }
 
     /**
@@ -371,6 +402,21 @@ public class ProcessGroupAuditor extends NiFiAuditor {
         saveAction(action, logger);
     }
 
+    private Action generateUpdateConnectableAction(final Connectable 
connectable, final Operation operation, final Component component) {
+        final FlowChangeAction action = createFlowChangeAction();
+        action.setSourceId(connectable.getIdentifier());
+        action.setSourceName(connectable.getName());
+        action.setSourceType(component);
+        action.setOperation(operation);
+
+        if (component == Component.Processor) {
+            FlowChangeExtensionDetails componentDetails = new 
FlowChangeExtensionDetails();
+            componentDetails.setType(connectable.getComponentType());
+            action.setComponentDetails(componentDetails);
+        }
+        return action;
+    }
+
     private void saveUpdateControllerServiceAction(final ControllerServiceNode 
csNode, final Operation operation) throws Throwable {
         final FlowChangeAction action = createFlowChangeAction();
         action.setSourceId(csNode.getIdentifier());
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestProcessGroupAuditor.java
 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestProcessGroupAuditor.java
index 07c8676367..0e8d78b1a3 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestProcessGroupAuditor.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestProcessGroupAuditor.java
@@ -22,6 +22,7 @@ import org.apache.nifi.action.FlowChangeAction;
 import org.apache.nifi.action.Operation;
 import org.apache.nifi.admin.service.AuditService;
 import org.apache.nifi.connectable.ConnectableType;
+import org.apache.nifi.connectable.Port;
 import org.apache.nifi.controller.FlowController;
 import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.ScheduledState;
@@ -52,6 +53,7 @@ import org.springframework.test.context.ContextConfiguration;
 import org.springframework.test.context.junit.jupiter.SpringExtension;
 
 import java.util.Arrays;
+import java.util.Comparator;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -73,6 +75,8 @@ public class TestProcessGroupAuditor {
     private static final String PG_1 = "processGroup1";
     private static final String PROC_1 = "processor1";
     private static final String PROC_2 = "processor2";
+    private static final String INPUT_PORT = "inputPort";
+    private static final String OUTPUT_PORT = "outputPort";
     private static final String CS_1 = "controllerService1";
     private static final String USER_ID = "user-id";
 
@@ -110,7 +114,7 @@ public class TestProcessGroupAuditor {
     }
 
     @Test
-    void testVerifyProcessGroupAuditing() {
+    void testVerifyStartProcessGroupAuditing() {
         final ProcessorNode processor1 = mock(StandardProcessorNode.class);
         final ProcessorNode processor2 = mock(StandardProcessorNode.class);
         when(processor1.getProcessGroup()).thenReturn(processGroup);
@@ -135,6 +139,96 @@ public class TestProcessGroupAuditor {
         assertEquals(Operation.Start, action.getOperation());
     }
 
+    @Test
+    void testVerifyEnableProcessGroupAuditing() {
+        final ProcessorNode processor1 = mock(StandardProcessorNode.class);
+        final ProcessorNode processor2 = mock(StandardProcessorNode.class);
+        final Port inputPort = mock(Port.class);
+        final Port outputPort = mock(Port.class);
+
+        when(processor1.getName()).thenReturn(PROC_1);
+        when(processor1.getProcessGroup()).thenReturn(processGroup);
+        
when(processor1.getConnectableType()).thenReturn(ConnectableType.PROCESSOR);
+        when(processor2.getName()).thenReturn(PROC_2);
+        when(processor2.getProcessGroup()).thenReturn(processGroup);
+        
when(processor2.getConnectableType()).thenReturn(ConnectableType.PROCESSOR);
+
+        when(inputPort.getName()).thenReturn(INPUT_PORT);
+        when(inputPort.getProcessGroup()).thenReturn(processGroup);
+        
when(inputPort.getConnectableType()).thenReturn(ConnectableType.INPUT_PORT);
+
+        when(outputPort.getName()).thenReturn(OUTPUT_PORT);
+        when(outputPort.getProcessGroup()).thenReturn(processGroup);
+        
when(outputPort.getConnectableType()).thenReturn(ConnectableType.OUTPUT_PORT);
+
+        when(processGroup.findProcessor(PROC_1)).thenReturn(processor1);
+        when(processGroup.findProcessor(PROC_2)).thenReturn(processor2);
+        when(processGroup.findProcessor(INPUT_PORT)).thenReturn(null);
+        when(processGroup.findProcessor(OUTPUT_PORT)).thenReturn(null);
+        when(processGroup.findInputPort(INPUT_PORT)).thenReturn(inputPort);
+        when(processGroup.findInputPort(OUTPUT_PORT)).thenReturn(null);
+        when(processGroup.findOutputPort(OUTPUT_PORT)).thenReturn(outputPort);
+
+        when(flowManager.getGroup(eq(PG_1))).thenReturn(processGroup);
+        when(flowManager.findConnectable(eq(PROC_1))).thenReturn(processor1);
+        when(flowManager.findConnectable(eq(PROC_2))).thenReturn(processor2);
+        
when(flowManager.findConnectable(eq(INPUT_PORT))).thenReturn(inputPort);
+        
when(flowManager.findConnectable(eq(OUTPUT_PORT))).thenReturn(outputPort);
+        when(flowController.getFlowManager()).thenReturn(flowManager);
+
+        processGroupDAO.enableComponents(PG_1, ScheduledState.STOPPED, new 
HashSet<>(Arrays.asList(PROC_1, PROC_2, INPUT_PORT, OUTPUT_PORT)));
+
+        verify(auditService, 
times(2)).addActions(argumentCaptorActions.capture());
+        final List<List<Action>> actions = 
argumentCaptorActions.getAllValues();
+        assertEquals(2, actions.size());
+        final Iterator<List<Action>> actionsIterator = actions.iterator();
+
+        // pg enabled
+        final List<Action> pgActions = actionsIterator.next();
+        assertEquals(1, pgActions.size());
+        final Action pgAction = pgActions.iterator().next();
+        assertInstanceOf(FlowChangeAction.class, pgAction);
+        assertEquals(USER_ID, pgAction.getUserIdentity());
+        assertEquals("ProcessGroup", pgAction.getSourceType().name());
+        assertEquals(Operation.Enable, pgAction.getOperation());
+
+        List<Action> componentActions = actionsIterator.next();
+        assertEquals(4, componentActions.size());
+        componentActions.sort(Comparator.comparing(Action::getSourceName));
+
+        // inputPort enabled
+        final Iterator<Action> actionIterator = componentActions.iterator();
+        Action componentAction = actionIterator.next();
+        assertInstanceOf(FlowChangeAction.class, componentAction);
+        assertEquals(USER_ID, componentAction.getUserIdentity());
+        assertEquals("InputPort", componentAction.getSourceType().name());
+        assertEquals(INPUT_PORT, componentAction.getSourceName());
+        assertEquals(Operation.Enable, componentAction.getOperation());
+
+        // outputPort enabled
+        componentAction = actionIterator.next();
+        assertInstanceOf(FlowChangeAction.class, componentAction);
+        assertEquals(USER_ID, componentAction.getUserIdentity());
+        assertEquals("OutputPort", componentAction.getSourceType().name());
+        assertEquals(OUTPUT_PORT, componentAction.getSourceName());
+        assertEquals(Operation.Enable, componentAction.getOperation());
+
+        // processors enabled
+        componentAction = actionIterator.next();
+        assertInstanceOf(FlowChangeAction.class, componentAction);
+        assertEquals(USER_ID, componentAction.getUserIdentity());
+        assertEquals("Processor", componentAction.getSourceType().name());
+        assertEquals(PROC_1, componentAction.getSourceName());
+        assertEquals(Operation.Enable, componentAction.getOperation());
+
+        componentAction = actionIterator.next();
+        assertInstanceOf(FlowChangeAction.class, componentAction);
+        assertEquals(USER_ID, componentAction.getUserIdentity());
+        assertEquals("Processor", componentAction.getSourceType().name());
+        assertEquals(PROC_2, componentAction.getSourceName());
+        assertEquals(Operation.Enable, componentAction.getOperation());
+    }
+
     @Test
     void testVerifyEnableControllerServicesAuditing() {
         final ControllerServiceNode cs = 
mock(StandardControllerServiceNode.class);

Reply via email to