This is an automated email from the ASF dual-hosted git repository.
markap14 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 88d41556a3 NIFI-15079 Fixed ProcessorNodes missing reference to
Process Group (#10406)
88d41556a3 is described below
commit 88d41556a3a41401f5ddd25f384fc94775fe7a2a
Author: David Handermann <[email protected]>
AuthorDate: Thu Oct 9 14:21:33 2025 -0500
NIFI-15079 Fixed ProcessorNodes missing reference to Process Group (#10406)
- Setting Process Group reference corrects missing Logging Attributes
---
.../nifi/controller/StandardReloadComponent.java | 2 +
.../nifi/web/dao/impl/StandardProcessorDAO.java | 5 +-
.../web/dao/impl/StandardProcessorDAOTest.java | 60 ++++++++++++++++++++++
3 files changed, 65 insertions(+), 2 deletions(-)
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardReloadComponent.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardReloadComponent.java
index 9893acec3e..bb126fde0a 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardReloadComponent.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardReloadComponent.java
@@ -97,6 +97,7 @@ public class StandardReloadComponent implements
ReloadComponent {
// attempt the creation to make sure it works before firing the
OnRemoved methods below
final String classloaderIsolationKey =
existingNode.getClassLoaderIsolationKey(processContext);
final ProcessorNode newNode =
flowController.getFlowManager().createProcessor(newType, id, bundleCoordinate,
additionalUrls, true, false, classloaderIsolationKey);
+ newNode.setProcessGroup(existingNode.getProcessGroup());
// set the new processor in the existing node
final ComponentLog componentLogger = new SimpleProcessLogger(id,
newNode.getProcessor(), new StandardLoggingContext(newNode));
@@ -151,6 +152,7 @@ public class StandardReloadComponent implements
ReloadComponent {
// attempt the creation to make sure it works before firing the
OnRemoved methods below
final String classloaderIsolationKey =
existingNode.getClassLoaderIsolationKey(configurationContext);
final ControllerServiceNode newNode =
flowController.getFlowManager().createControllerService(newType, id,
bundleCoordinate, additionalUrls, true, false, classloaderIsolationKey);
+ newNode.setProcessGroup(existingNode.getProcessGroup());
// take the invocation handler that was created for new proxy and is
set to look at the new node,
// and set it to look at the existing node
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java
index 1a9a45a37a..97856e9ad8 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java
@@ -112,12 +112,13 @@ public class StandardProcessorDAO extends ComponentDAO
implements ProcessorDAO {
}
// get the group to add the processor to
- ProcessGroup group = locateProcessGroup(flowController, groupId);
+ final ProcessGroup group = locateProcessGroup(flowController, groupId);
try {
// attempt to create the processor
final BundleCoordinate bundleCoordinate =
BundleUtils.getBundle(flowController.getExtensionManager(),
processorDTO.getType(), processorDTO.getBundle());
- ProcessorNode processor =
flowController.getFlowManager().createProcessor(processorDTO.getType(),
processorDTO.getId(), bundleCoordinate);
+ final ProcessorNode processor =
flowController.getFlowManager().createProcessor(processorDTO.getType(),
processorDTO.getId(), bundleCoordinate);
+ processor.setProcessGroup(group);
// ensure we can perform the update before we add the processor to
the flow
verifyUpdate(processor, processorDTO);
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/StandardProcessorDAOTest.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/StandardProcessorDAOTest.java
index ce215fbb25..274d6d6ae9 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/StandardProcessorDAOTest.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/StandardProcessorDAOTest.java
@@ -16,10 +16,17 @@
*/
package org.apache.nifi.web.dao.impl;
+import org.apache.nifi.bundle.Bundle;
+import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.bundle.BundleDetails;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.processor.Processor;
import org.apache.nifi.web.ResourceNotFoundException;
import org.apache.nifi.web.api.dto.ProcessorDTO;
import org.apache.nifi.web.dao.ComponentStateDAO;
@@ -27,12 +34,19 @@ import org.apache.nifi.controller.flow.FlowManager;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
+import java.io.File;
+import java.util.List;
+
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.never;
@@ -42,6 +56,10 @@ import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
class StandardProcessorDAOTest {
+ private static final String BUNDLE_GROUP_ID = "org.apache.nifi";
+
+ private static final String BUNDLE_VERSION = "1.0.0";
+
@Mock
private FlowController flowController;
@@ -51,12 +69,24 @@ class StandardProcessorDAOTest {
@Mock
private ProcessorNode processorNode;
+ @Mock
+ private Processor processor;
+
@Mock
private ProcessGroup processGroup;
@Mock
private FlowManager flowManager;
+ @Mock
+ private ExtensionManager extensionManager;
+
+ @Mock
+ private StateManagerProvider stateManagerProvider;
+
+ @Mock
+ private StateManager stateManager;
+
private StandardProcessorDAO dao;
@BeforeEach
@@ -164,4 +194,34 @@ class StandardProcessorDAOTest {
// Should throw ResourceNotFoundException
assertThrows(ResourceNotFoundException.class, () ->
dao.verifyUpdate(processorDTO));
}
+
+ @Test
+ void testCreateProcessor(@TempDir final File tempDir) {
+ final String id = ProcessorDTO.class.getSimpleName();
+ final String groupId = ProcessGroup.class.getSimpleName();
+ final String processorType = ProcessorDTO.class.getCanonicalName();
+
+ final ProcessorDTO processorDTO = new ProcessorDTO();
+ processorDTO.setId(id);
+ processorDTO.setType(processorType);
+
+ final BundleCoordinate bundleCoordinate = new
BundleCoordinate(BUNDLE_GROUP_ID, processorType, BUNDLE_VERSION);
+ final BundleDetails bundleDetails = new
BundleDetails.Builder().coordinate(bundleCoordinate).workingDir(tempDir).build();
+ final Bundle bundle = new Bundle(bundleDetails,
getClass().getClassLoader());
+ final List<Bundle> bundles = List.of(bundle);
+
+ when(flowManager.getGroup(eq(groupId))).thenReturn(processGroup);
+
when(flowController.getExtensionManager()).thenReturn(extensionManager);
+ when(flowManager.createProcessor(eq(processorType), eq(id),
eq(bundleCoordinate))).thenReturn(processorNode);
+
when(extensionManager.getBundles(eq(processorType))).thenReturn(bundles);
+ when(processorNode.getProcessor()).thenReturn(processor);
+ when(processor.getIdentifier()).thenReturn(id);
+
when(flowController.getStateManagerProvider()).thenReturn(stateManagerProvider);
+ when(stateManagerProvider.getStateManager(eq(id),
any())).thenReturn(stateManager);
+
+ final ProcessorNode createdProcessorNode =
dao.createProcessor(groupId, processorDTO);
+
+ assertEquals(processorNode, createdProcessorNode);
+ verify(processorNode).setProcessGroup(processGroup);
+ }
}