This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 0218e76b18 NIFI-14688 Added Auditing for Processor and Port State
changes at the Process Group level (#10233)
0218e76b18 is described below
commit 0218e76b1867514dba47cd1191b819146603c4df
Author: NissimShiman <[email protected]>
AuthorDate: Mon Sep 1 11:50:27 2025 -0400
NIFI-14688 Added Auditing for Processor and Port State changes at the
Process Group level (#10233)
Signed-off-by: David Handermann <[email protected]>
---
.../org/apache/nifi/audit/ProcessGroupAuditor.java | 46 +++++++++++
.../apache/nifi/audit/TestProcessGroupAuditor.java | 96 +++++++++++++++++++++-
2 files changed, 141 insertions(+), 1 deletion(-)
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ProcessGroupAuditor.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ProcessGroupAuditor.java
index 981937a85a..33259dac39 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ProcessGroupAuditor.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ProcessGroupAuditor.java
@@ -24,6 +24,9 @@ import
org.apache.nifi.action.component.details.FlowChangeExtensionDetails;
import org.apache.nifi.action.details.ActionDetails;
import org.apache.nifi.action.details.FlowChangeConfigureDetails;
import org.apache.nifi.action.details.FlowChangeMoveDetails;
+import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.connectable.Port;
+import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceState;
@@ -246,6 +249,34 @@ public class ProcessGroupAuditor extends NiFiAuditor {
}
saveUpdateProcessGroupAction(groupId, operation);
+ saveActions(getComponentActions(groupId, componentIds, operation),
logger);
+ }
+
+ private List<Action> getComponentActions(final String groupId, final
Collection<String> componentIds, final Operation operation) {
+ final List<Action> actions = new ArrayList<>();
+ final ProcessGroupDAO processGroupDAO = getProcessGroupDAO();
+ final ProcessGroup processGroup =
processGroupDAO.getProcessGroup(groupId);
+
+ for (String componentId : componentIds) {
+ final ProcessorNode processorNode =
processGroup.findProcessor(componentId);
+ if (processorNode != null) {
+ actions.add(generateUpdateConnectableAction(processorNode,
operation, Component.Processor));
+ continue;
+ }
+
+ Port port = processGroup.findInputPort(componentId);
+ if (port != null) {
+ actions.add(generateUpdateConnectableAction(port, operation,
Component.InputPort));
+ continue;
+ }
+
+ port = processGroup.findOutputPort(componentId);
+ if (port != null) {
+ actions.add(generateUpdateConnectableAction(port, operation,
Component.OutputPort));
+ }
+ }
+
+ return actions;
}
/**
@@ -371,6 +402,21 @@ public class ProcessGroupAuditor extends NiFiAuditor {
saveAction(action, logger);
}
+ private Action generateUpdateConnectableAction(final Connectable
connectable, final Operation operation, final Component component) {
+ final FlowChangeAction action = createFlowChangeAction();
+ action.setSourceId(connectable.getIdentifier());
+ action.setSourceName(connectable.getName());
+ action.setSourceType(component);
+ action.setOperation(operation);
+
+ if (component == Component.Processor) {
+ FlowChangeExtensionDetails componentDetails = new
FlowChangeExtensionDetails();
+ componentDetails.setType(connectable.getComponentType());
+ action.setComponentDetails(componentDetails);
+ }
+ return action;
+ }
+
private void saveUpdateControllerServiceAction(final ControllerServiceNode
csNode, final Operation operation) throws Throwable {
final FlowChangeAction action = createFlowChangeAction();
action.setSourceId(csNode.getIdentifier());
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestProcessGroupAuditor.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestProcessGroupAuditor.java
index 07c8676367..0e8d78b1a3 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestProcessGroupAuditor.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestProcessGroupAuditor.java
@@ -22,6 +22,7 @@ import org.apache.nifi.action.FlowChangeAction;
import org.apache.nifi.action.Operation;
import org.apache.nifi.admin.service.AuditService;
import org.apache.nifi.connectable.ConnectableType;
+import org.apache.nifi.connectable.Port;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ScheduledState;
@@ -52,6 +53,7 @@ import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import java.util.Arrays;
+import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -73,6 +75,8 @@ public class TestProcessGroupAuditor {
private static final String PG_1 = "processGroup1";
private static final String PROC_1 = "processor1";
private static final String PROC_2 = "processor2";
+ private static final String INPUT_PORT = "inputPort";
+ private static final String OUTPUT_PORT = "outputPort";
private static final String CS_1 = "controllerService1";
private static final String USER_ID = "user-id";
@@ -110,7 +114,7 @@ public class TestProcessGroupAuditor {
}
@Test
- void testVerifyProcessGroupAuditing() {
+ void testVerifyStartProcessGroupAuditing() {
final ProcessorNode processor1 = mock(StandardProcessorNode.class);
final ProcessorNode processor2 = mock(StandardProcessorNode.class);
when(processor1.getProcessGroup()).thenReturn(processGroup);
@@ -135,6 +139,96 @@ public class TestProcessGroupAuditor {
assertEquals(Operation.Start, action.getOperation());
}
+ @Test
+ void testVerifyEnableProcessGroupAuditing() {
+ final ProcessorNode processor1 = mock(StandardProcessorNode.class);
+ final ProcessorNode processor2 = mock(StandardProcessorNode.class);
+ final Port inputPort = mock(Port.class);
+ final Port outputPort = mock(Port.class);
+
+ when(processor1.getName()).thenReturn(PROC_1);
+ when(processor1.getProcessGroup()).thenReturn(processGroup);
+
when(processor1.getConnectableType()).thenReturn(ConnectableType.PROCESSOR);
+ when(processor2.getName()).thenReturn(PROC_2);
+ when(processor2.getProcessGroup()).thenReturn(processGroup);
+
when(processor2.getConnectableType()).thenReturn(ConnectableType.PROCESSOR);
+
+ when(inputPort.getName()).thenReturn(INPUT_PORT);
+ when(inputPort.getProcessGroup()).thenReturn(processGroup);
+
when(inputPort.getConnectableType()).thenReturn(ConnectableType.INPUT_PORT);
+
+ when(outputPort.getName()).thenReturn(OUTPUT_PORT);
+ when(outputPort.getProcessGroup()).thenReturn(processGroup);
+
when(outputPort.getConnectableType()).thenReturn(ConnectableType.OUTPUT_PORT);
+
+ when(processGroup.findProcessor(PROC_1)).thenReturn(processor1);
+ when(processGroup.findProcessor(PROC_2)).thenReturn(processor2);
+ when(processGroup.findProcessor(INPUT_PORT)).thenReturn(null);
+ when(processGroup.findProcessor(OUTPUT_PORT)).thenReturn(null);
+ when(processGroup.findInputPort(INPUT_PORT)).thenReturn(inputPort);
+ when(processGroup.findInputPort(OUTPUT_PORT)).thenReturn(null);
+ when(processGroup.findOutputPort(OUTPUT_PORT)).thenReturn(outputPort);
+
+ when(flowManager.getGroup(eq(PG_1))).thenReturn(processGroup);
+ when(flowManager.findConnectable(eq(PROC_1))).thenReturn(processor1);
+ when(flowManager.findConnectable(eq(PROC_2))).thenReturn(processor2);
+
when(flowManager.findConnectable(eq(INPUT_PORT))).thenReturn(inputPort);
+
when(flowManager.findConnectable(eq(OUTPUT_PORT))).thenReturn(outputPort);
+ when(flowController.getFlowManager()).thenReturn(flowManager);
+
+ processGroupDAO.enableComponents(PG_1, ScheduledState.STOPPED, new
HashSet<>(Arrays.asList(PROC_1, PROC_2, INPUT_PORT, OUTPUT_PORT)));
+
+ verify(auditService,
times(2)).addActions(argumentCaptorActions.capture());
+ final List<List<Action>> actions =
argumentCaptorActions.getAllValues();
+ assertEquals(2, actions.size());
+ final Iterator<List<Action>> actionsIterator = actions.iterator();
+
+ // pg enabled
+ final List<Action> pgActions = actionsIterator.next();
+ assertEquals(1, pgActions.size());
+ final Action pgAction = pgActions.iterator().next();
+ assertInstanceOf(FlowChangeAction.class, pgAction);
+ assertEquals(USER_ID, pgAction.getUserIdentity());
+ assertEquals("ProcessGroup", pgAction.getSourceType().name());
+ assertEquals(Operation.Enable, pgAction.getOperation());
+
+ List<Action> componentActions = actionsIterator.next();
+ assertEquals(4, componentActions.size());
+ componentActions.sort(Comparator.comparing(Action::getSourceName));
+
+ // inputPort enabled
+ final Iterator<Action> actionIterator = componentActions.iterator();
+ Action componentAction = actionIterator.next();
+ assertInstanceOf(FlowChangeAction.class, componentAction);
+ assertEquals(USER_ID, componentAction.getUserIdentity());
+ assertEquals("InputPort", componentAction.getSourceType().name());
+ assertEquals(INPUT_PORT, componentAction.getSourceName());
+ assertEquals(Operation.Enable, componentAction.getOperation());
+
+ // outputPort enabled
+ componentAction = actionIterator.next();
+ assertInstanceOf(FlowChangeAction.class, componentAction);
+ assertEquals(USER_ID, componentAction.getUserIdentity());
+ assertEquals("OutputPort", componentAction.getSourceType().name());
+ assertEquals(OUTPUT_PORT, componentAction.getSourceName());
+ assertEquals(Operation.Enable, componentAction.getOperation());
+
+ // processors enabled
+ componentAction = actionIterator.next();
+ assertInstanceOf(FlowChangeAction.class, componentAction);
+ assertEquals(USER_ID, componentAction.getUserIdentity());
+ assertEquals("Processor", componentAction.getSourceType().name());
+ assertEquals(PROC_1, componentAction.getSourceName());
+ assertEquals(Operation.Enable, componentAction.getOperation());
+
+ componentAction = actionIterator.next();
+ assertInstanceOf(FlowChangeAction.class, componentAction);
+ assertEquals(USER_ID, componentAction.getUserIdentity());
+ assertEquals("Processor", componentAction.getSourceType().name());
+ assertEquals(PROC_2, componentAction.getSourceName());
+ assertEquals(Operation.Enable, componentAction.getOperation());
+ }
+
@Test
void testVerifyEnableControllerServicesAuditing() {
final ControllerServiceNode cs =
mock(StandardControllerServiceNode.class);