exceptionfactory commented on code in PR #10753:
URL: https://github.com/apache/nifi/pull/10753#discussion_r2770935180
##########
nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/PythonProcess.java:
##########
@@ -328,6 +328,12 @@ private void setupEnvironment() throws IOException {
return;
}
+ // Check if we've been shutdown before starting venv creation
Review Comment:
```suggestion
```
##########
nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/PythonProcess.java:
##########
@@ -328,6 +328,12 @@ private void setupEnvironment() throws IOException {
return;
}
+ // Check if we've been shutdown before starting venv creation
+ if (isShutdown()) {
+ logger.info("Not creating Python Virtual Environment for {}
because process is shutting down", componentId);
Review Comment:
The log message is not needed since an exception is being thrown
##########
nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/PythonProcess.java:
##########
@@ -346,17 +352,33 @@ private void setupEnvironment() throws IOException {
final String command = String.join(" ", processBuilder.command());
logger.debug("Creating Python Virtual Environment {} using command
{}", virtualEnvHome, command);
- final Process process = processBuilder.start();
+ final Process venvProcess = processBuilder.start();
- final int result;
try {
- result = process.waitFor();
+ // Wait for the venv creation with periodic checks for shutdown
+ // This allows the venv creation to be interrupted when the
process is being shut down
+ while (!venvProcess.waitFor(1, TimeUnit.SECONDS)) {
+ if (isShutdown()) {
+ logger.info("Interrupting Python Virtual Environment
creation for {} due to shutdown", componentId);
+ venvProcess.destroyForcibly();
+ throw new IOException("Python process %s shutdown during
virtual environment creation".formatted(componentId));
+ }
+ }
+
+ final int result = venvProcess.exitValue();
+ if (result != 0) {
+ throw new IOException("Failed to create Python Environment " +
virtualEnvHome + ": process existed with code " + result);
Review Comment:
Recommend reformatting using `.formatted()` like other messages
##########
nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/PythonProcess.java:
##########
@@ -346,17 +352,33 @@ private void setupEnvironment() throws IOException {
final String command = String.join(" ", processBuilder.command());
logger.debug("Creating Python Virtual Environment {} using command
{}", virtualEnvHome, command);
- final Process process = processBuilder.start();
+ final Process venvProcess = processBuilder.start();
- final int result;
try {
- result = process.waitFor();
+ // Wait for the venv creation with periodic checks for shutdown
+ // This allows the venv creation to be interrupted when the
process is being shut down
+ while (!venvProcess.waitFor(1, TimeUnit.SECONDS)) {
+ if (isShutdown()) {
+ logger.info("Interrupting Python Virtual Environment
creation for {} due to shutdown", componentId);
+ venvProcess.destroyForcibly();
+ throw new IOException("Python process %s shutdown during
virtual environment creation".formatted(componentId));
+ }
+ }
+
+ final int result = venvProcess.exitValue();
+ if (result != 0) {
+ throw new IOException("Failed to create Python Environment " +
virtualEnvHome + ": process existed with code " + result);
+ }
} catch (final InterruptedException e) {
- throw new IOException("Interrupted while waiting for Python
virtual environment to be created");
+ // If interrupted, destroy the venv creation process and propagate
+ venvProcess.destroyForcibly();
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted while waiting for Python
virtual environment to be created", e);
}
- if (result != 0) {
- throw new IOException("Failed to create Python Environment " +
virtualEnvHome + ": process existed with code " + result);
+ // Check shutdown again before proceeding
Review Comment:
```suggestion
```
##########
nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/PythonProcess.java:
##########
@@ -529,6 +569,26 @@ public int getProcessorCount() {
return processorPrefersIsolation.size();
}
+ /**
+ * Returns true if this process is being started for the given component
identifier.
+ * This is used to identify processes that are still in the startup phase
(venv creation)
+ * and haven't yet created any processors.
+ *
+ * @param identifier the component identifier
+ * @return true if this process is starting for the given identifier
+ */
+ public boolean isStartingFor(final String identifier) {
Review Comment:
The name and description of this method does not seem to align with the
behavior. It could mean starting, or perhaps more precisely, `isNotRunning`
since it is only checking the processor count.
##########
nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-bridge/src/test/java/org/apache/nifi/py4j/StandardPythonProcessorBridgeTest.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.py4j;
+
+import org.apache.nifi.components.AsyncLoadedProcessor.LoadState;
+import org.apache.nifi.python.PythonController;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Unit tests for StandardPythonProcessorBridge cancellation functionality.
+ */
+class StandardPythonProcessorBridgeTest {
+
+ @Mock
+ private PythonController controller;
+
+ @Mock
+ private ProcessorCreationWorkflow creationWorkflow;
+
+ @TempDir
+ private Path tempDir;
+
+ private StandardPythonProcessorBridge bridge;
+
+ @BeforeEach
+ void setUp() throws IOException {
+ MockitoAnnotations.openMocks(this);
+
+ // Create a temporary module file
+ final File moduleFile =
Files.createFile(tempDir.resolve("test_processor.py")).toFile();
+ final File workDir =
Files.createDirectory(tempDir.resolve("work")).toFile();
+
+ bridge = new StandardPythonProcessorBridge.Builder()
+ .controller(controller)
+ .creationWorkflow(creationWorkflow)
+ .processorType("TestProcessor")
+ .processorVersion("1.0.0")
+ .workingDirectory(workDir)
+ .moduleFile(moduleFile)
+ .build();
+ }
+
+ @Test
+ void testCancelSetsFlag() {
+ assertFalse(bridge.isCanceled(), "Bridge should not be canceled
initially");
+
+ bridge.cancel();
+
+ assertTrue(bridge.isCanceled(), "Bridge should be canceled after
cancel() is called");
+ }
+
+ @Test
+ void testCancelSetsLoadStateToCanceled() {
+ // Initially should be in some non-canceled state
+ LoadState initialState = bridge.getLoadState();
+ assertFalse(initialState == LoadState.CANCELED, "Initial state should
not be CANCELED");
+
+ bridge.cancel();
+
+ assertEquals(LoadState.CANCELED, bridge.getLoadState(),
+ "Load state should be CANCELED after cancel() is called");
+ }
+
+ @Test
+ void testCancelIsIdempotent() {
+ bridge.cancel();
+ assertTrue(bridge.isCanceled());
+ assertEquals(LoadState.CANCELED, bridge.getLoadState());
+
+ // Call cancel again - should not throw or change state
+ bridge.cancel();
+ assertTrue(bridge.isCanceled());
+ assertEquals(LoadState.CANCELED, bridge.getLoadState());
+
+ // And again
+ bridge.cancel();
+ assertTrue(bridge.isCanceled());
+ assertEquals(LoadState.CANCELED, bridge.getLoadState());
+ }
+
+ @Test
+ void testIsCanceledReturnsFalseInitially() {
+ assertFalse(bridge.isCanceled(), "isCanceled() should return false
before cancel() is called");
+ }
+
+ @Test
+ void testIsCanceledReturnsTrueAfterCancel() {
+ bridge.cancel();
+ assertTrue(bridge.isCanceled(), "isCanceled() should return true after
cancel() is called");
+ }
+
+ @Test
+ @Timeout(value = 5, unit = TimeUnit.SECONDS)
+ void testCancelFromDifferentThread() throws InterruptedException {
+ final CountDownLatch cancelLatch = new CountDownLatch(1);
+ final AtomicBoolean canceledFromThread = new AtomicBoolean(false);
+
+ Thread cancelThread = new Thread(() -> {
+ bridge.cancel();
+ canceledFromThread.set(bridge.isCanceled());
+ cancelLatch.countDown();
+ });
+
+ cancelThread.start();
+ assertTrue(cancelLatch.await(5, TimeUnit.SECONDS), "Cancel should
complete within timeout");
+ assertTrue(canceledFromThread.get(), "Bridge should be canceled from
another thread");
+ assertTrue(bridge.isCanceled(), "Bridge should be canceled as seen
from main thread");
+ }
+
+ @Test
+ void testLoadStateTransitionsToCanceledOnCancel() {
+ // Get initial state
+ LoadState beforeCancel = bridge.getLoadState();
+
+ // Cancel
+ bridge.cancel();
+
+ // Verify state changed to CANCELED
+ LoadState afterCancel = bridge.getLoadState();
+ assertEquals(LoadState.CANCELED, afterCancel,
+ "Load state should transition to CANCELED, was: " +
beforeCancel);
+ }
+
+ @Test
+ @Timeout(value = 10, unit = TimeUnit.SECONDS)
+ void testConcurrentCancelCalls() throws InterruptedException {
+ final int numThreads = 10;
+ final CountDownLatch startLatch = new CountDownLatch(1);
+ final CountDownLatch doneLatch = new CountDownLatch(numThreads);
+ final AtomicReference<Exception> exception = new AtomicReference<>();
+
+ for (int i = 0; i < numThreads; i++) {
+ new Thread(() -> {
+ try {
+ startLatch.await();
+ bridge.cancel();
+ } catch (Exception e) {
+ exception.set(e);
+ } finally {
+ doneLatch.countDown();
+ }
+ }).start();
+ }
+
+ // Start all threads at once
+ startLatch.countDown();
+
+ // Wait for all to complete
+ assertTrue(doneLatch.await(10, TimeUnit.SECONDS), "All threads should
complete");
+
+ // Verify no exceptions occurred
+ if (exception.get() != null) {
+ throw new AssertionError("Exception during concurrent cancel",
exception.get());
+ }
Review Comment:
assertNotNull should be used
##########
nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/python/PythonNarDeletionDuringInitIT.java:
##########
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.tests.system.python;
+
+import org.apache.nifi.nar.NarState;
+import org.apache.nifi.tests.system.NiFiInstanceFactory;
+import org.apache.nifi.tests.system.NiFiSystemIT;
+import org.apache.nifi.tests.system.nar.NarUploadUtil;
+import org.apache.nifi.toolkit.client.NiFiClientException;
+import org.apache.nifi.web.api.dto.BundleDTO;
+import org.apache.nifi.web.api.dto.DocumentedTypeDTO;
+import org.apache.nifi.web.api.dto.NarSummaryDTO;
+import org.apache.nifi.web.api.entity.ProcessorEntity;
+import org.apache.nifi.web.api.entity.ProcessorTypesEntity;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Integration tests for Python NAR deletion during processor initialization.
+ * These tests verify that:
+ * 1. Force-deleting a NAR while a Python processor is initializing does not
block indefinitely
+ * 2. The processor initialization is properly canceled when the NAR is deleted
+ * 3. The system remains in a consistent state after the operation
+ */
+public class PythonNarDeletionDuringInitIT extends NiFiSystemIT {
+
+ private static final Logger logger =
LoggerFactory.getLogger(PythonNarDeletionDuringInitIT.class);
+
+ private static final String PYTHON_TEXT_EXTENSIONS_NAR_ID =
"nifi-python-test-extensions-nar";
+ private static final String PYTHON_WRITE_BECH_32_CHARSET =
"WriteBech32Charset";
+
+ // Maximum time to wait for NAR deletion - should complete quickly with
interruptibility fix
+ private static final int NAR_DELETE_TIMEOUT_SECONDS = 30;
+
+ @Override
+ public NiFiInstanceFactory getInstanceFactory() {
+ return createPythonicInstanceFactory();
+ }
+
+
+ /**
+ * Tests that force-deleting a NAR immediately after creating a processor
+ * does not block indefinitely. The deletion should complete within a
reasonable time
+ * even if the processor is still initializing (creating venv, downloading
dependencies, etc.).
+ */
+ @Test
+ @Timeout(value = 120, unit = TimeUnit.SECONDS)
+ public void testForceDeleteNarDuringProcessorInitialization() throws
NiFiClientException, IOException, InterruptedException {
+ final NarUploadUtil narUploadUtil = new NarUploadUtil(getNifiClient());
+
+ // Get the Python NAR file
+ final File pythonTestExtensionsNar = getPythonTestExtensionsNar();
+ assertNotNull(pythonTestExtensionsNar, "Python test extensions NAR not
found");
+
+ // Upload the NAR and wait for it to be installed
+ final NarSummaryDTO uploadedNarSummary =
narUploadUtil.uploadNar(pythonTestExtensionsNar);
+
waitFor(narUploadUtil.getWaitForNarStateSupplier(uploadedNarSummary.getIdentifier(),
NarState.INSTALLED));
+ logger.info("NAR uploaded and installed: {}",
uploadedNarSummary.getIdentifier());
+
+ // Get the processor type info
+ final DocumentedTypeDTO processorTypeDTO =
getDocumentedTypeDTO(PYTHON_WRITE_BECH_32_CHARSET);
+ assertNotNull(processorTypeDTO, "Processor type not found after NAR
installation");
+ final BundleDTO processorBundle = processorTypeDTO.getBundle();
+
+ // Create a Python processor - this triggers async initialization
+ final ProcessorEntity pythonProcessor =
getClientUtil().createProcessor(
+ PYTHON_WRITE_BECH_32_CHARSET,
+ processorBundle.getGroup(),
+ processorBundle.getArtifact(),
+ processorBundle.getVersion());
+ logger.info("Created processor: {}", pythonProcessor.getId());
+
+ // Immediately force-delete the NAR while the processor is still
initializing
+ // This should NOT block indefinitely - the initialization should be
interruptible
+ logger.info("Force-deleting NAR while processor is initializing...");
+
+ final CompletableFuture<NarSummaryDTO> deleteFuture =
CompletableFuture.supplyAsync(() -> {
+ try {
+ return narUploadUtil.deleteNar(uploadedNarSummary);
+ } catch (final Exception e) {
+ throw new RuntimeException("Failed to delete NAR", e);
+ }
+ });
+
+ // The deletion should complete within the timeout - this is the key
assertion
+ // Before the fix, this would block indefinitely waiting for venv
creation to complete
+ assertDoesNotThrow(() -> {
+ try {
+ final NarSummaryDTO deletedNar =
deleteFuture.get(NAR_DELETE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ logger.info("NAR deleted successfully: {}",
deletedNar.getIdentifier());
+ } catch (final TimeoutException e) {
+ throw new AssertionError(
+ "NAR deletion blocked for more than " +
NAR_DELETE_TIMEOUT_SECONDS + " seconds. " +
+ "This indicates the initialization was not properly
interrupted.", e);
+ } catch (final ExecutionException e) {
+ throw new RuntimeException("NAR deletion failed with
exception", e.getCause());
+ }
+ }, "NAR deletion should complete within " + NAR_DELETE_TIMEOUT_SECONDS
+ " seconds");
+
+ // Verify the NAR is gone
+ narUploadUtil.verifyNarSummaries(0);
+
+ // Verify the processor type is no longer available
+ assertNull(getDocumentedTypeDTO(PYTHON_WRITE_BECH_32_CHARSET),
+ "Processor type should not be available after NAR deletion");
+
+ // Verify the processor is now ghosted (extension missing)
+ final String pythonProcessorId = pythonProcessor.getId();
+ waitFor(() -> {
+ final ProcessorEntity processorAfterDelete =
getNifiClient().getProcessorClient().getProcessor(pythonProcessorId);
+ logger.info("Waiting for processor {} to be considered missing",
pythonProcessorId);
+ return processorAfterDelete.getComponent().getExtensionMissing();
+ });
+
+ logger.info("Test completed successfully - NAR deletion did not block
during processor initialization");
+ }
+
+ /**
+ * Tests that after a NAR is force-deleted during initialization and then
re-uploaded,
+ * a new processor can be created and functions correctly.
+ */
+ @Test
+ @Timeout(value = 180, unit = TimeUnit.SECONDS)
+ public void testNarReuploadAfterForceDeleteDuringInit() throws
NiFiClientException, IOException, InterruptedException {
+ final NarUploadUtil narUploadUtil = new NarUploadUtil(getNifiClient());
+
+ // Get the Python NAR file
+ final File pythonTestExtensionsNar = getPythonTestExtensionsNar();
+ assertNotNull(pythonTestExtensionsNar);
+
+ // Phase 1: Upload NAR, create processor, force-delete NAR during init
+ logger.info("Phase 1: Upload NAR, create processor, force-delete
during init");
+
+ final NarSummaryDTO uploadedNarSummary =
narUploadUtil.uploadNar(pythonTestExtensionsNar);
+
waitFor(narUploadUtil.getWaitForNarStateSupplier(uploadedNarSummary.getIdentifier(),
NarState.INSTALLED));
+
+ final DocumentedTypeDTO processorTypeDTO =
getDocumentedTypeDTO(PYTHON_WRITE_BECH_32_CHARSET);
+ final BundleDTO processorBundle = processorTypeDTO.getBundle();
+
+ // Create processor (triggers initialization)
+ final ProcessorEntity firstProcessor = getClientUtil().createProcessor(
+ PYTHON_WRITE_BECH_32_CHARSET,
+ processorBundle.getGroup(),
+ processorBundle.getArtifact(),
+ processorBundle.getVersion());
+
+ // Immediately force-delete NAR
+ final CompletableFuture<NarSummaryDTO> deleteFuture =
CompletableFuture.supplyAsync(() -> {
+ try {
+ return narUploadUtil.deleteNar(uploadedNarSummary);
+ } catch (final Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ // Wait for deletion (should not timeout)
+ try {
+ deleteFuture.get(NAR_DELETE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ } catch (final TimeoutException | ExecutionException e) {
+ throw new AssertionError("NAR deletion should not block during
processor initialization", e);
+ }
+
+ // Wait for the first processor to become ghosted
+ final String firstProcessorId = firstProcessor.getId();
+ waitFor(() -> {
+ final ProcessorEntity proc =
getNifiClient().getProcessorClient().getProcessor(firstProcessorId);
+ return proc.getComponent().getExtensionMissing();
+ });
+
+ // Delete the ghosted processor to clean up
+ getNifiClient().getProcessorClient().deleteProcessor(firstProcessor);
+
+ // Phase 2: Re-upload NAR and verify new processor works
+ logger.info("Phase 2: Re-upload NAR and verify new processor works");
+
+ final NarSummaryDTO reuploadedNarSummary =
narUploadUtil.uploadNar(pythonTestExtensionsNar);
+
waitFor(narUploadUtil.getWaitForNarStateSupplier(reuploadedNarSummary.getIdentifier(),
NarState.INSTALLED));
+
+ // Verify processor type is available again
+ final DocumentedTypeDTO reloadedProcessorType =
getDocumentedTypeDTO(PYTHON_WRITE_BECH_32_CHARSET);
+ assertNotNull(reloadedProcessorType, "Processor type should be
available after NAR re-upload");
+
+ // Create a new processor and verify it initializes successfully
+ final ProcessorEntity secondProcessor =
getClientUtil().createProcessor(
+ PYTHON_WRITE_BECH_32_CHARSET,
+ reloadedProcessorType.getBundle().getGroup(),
+ reloadedProcessorType.getBundle().getArtifact(),
+ reloadedProcessorType.getBundle().getVersion());
+
+ // Wait for the processor to be loaded (not ghosted) - we don't need
it to be fully valid
+ // since that would require wiring up connections
+ final String secondProcessorId = secondProcessor.getId();
+ waitFor(() -> {
+ final ProcessorEntity proc =
getNifiClient().getProcessorClient().getProcessor(secondProcessorId);
+ // Processor is loaded when it's not marked as extension missing
+ return !proc.getComponent().getExtensionMissing();
+ });
+
+ // Verify the processor is not ghosted
+ final ProcessorEntity loadedProcessor =
getNifiClient().getProcessorClient().getProcessor(secondProcessorId);
+ assertFalse(loadedProcessor.getComponent().getExtensionMissing(),
+ "Processor should not be marked as extension missing after NAR
re-upload");
+
+ // Clean up
+ getNifiClient().getProcessorClient().deleteProcessor(loadedProcessor);
+ narUploadUtil.deleteNar(reuploadedNarSummary);
+
+ logger.info("Test completed successfully - NAR re-upload after
force-delete works correctly");
+ }
+
+ /**
+ * Tests rapid NAR upload/delete cycles to verify the system remains stable
+ * and doesn't accumulate orphaned resources.
+ */
+ @Test
+ @Timeout(value = 240, unit = TimeUnit.SECONDS)
+ public void testRapidNarUploadDeleteCycles() throws NiFiClientException,
IOException, InterruptedException {
+ final NarUploadUtil narUploadUtil = new NarUploadUtil(getNifiClient());
+ final File pythonTestExtensionsNar = getPythonTestExtensionsNar();
+ assertNotNull(pythonTestExtensionsNar);
+
+ final int cycles = 3;
+ logger.info("Running {} rapid NAR upload/delete cycles", cycles);
+
+ for (int i = 0; i < cycles; i++) {
+ logger.info("Cycle {} of {}", i + 1, cycles);
+
+ // Upload NAR
+ final NarSummaryDTO narSummary =
narUploadUtil.uploadNar(pythonTestExtensionsNar);
+
waitFor(narUploadUtil.getWaitForNarStateSupplier(narSummary.getIdentifier(),
NarState.INSTALLED));
+
+ final DocumentedTypeDTO processorType =
getDocumentedTypeDTO(PYTHON_WRITE_BECH_32_CHARSET);
+ assertNotNull(processorType, "Processor type should be available");
+
+ // Create processor
+ final ProcessorEntity processor = getClientUtil().createProcessor(
+ PYTHON_WRITE_BECH_32_CHARSET,
+ processorType.getBundle().getGroup(),
+ processorType.getBundle().getArtifact(),
+ processorType.getBundle().getVersion());
+
+ // Force-delete NAR immediately
+ final CompletableFuture<NarSummaryDTO> deleteFuture =
CompletableFuture.supplyAsync(() -> {
+ try {
+ return narUploadUtil.deleteNar(narSummary);
+ } catch (final Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ // Verify deletion completes without blocking
+ try {
+ deleteFuture.get(NAR_DELETE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ } catch (final TimeoutException e) {
+ throw new AssertionError("Cycle " + (i + 1) + ": NAR deletion
blocked", e);
+ } catch (final ExecutionException e) {
+ throw new RuntimeException("Cycle " + (i + 1) + ": NAR
deletion failed", e.getCause());
+ }
+
+ // Wait for processor to become ghosted
+ final String processorId = processor.getId();
+ waitFor(() -> {
+ final ProcessorEntity proc =
getNifiClient().getProcessorClient().getProcessor(processorId);
+ return proc.getComponent().getExtensionMissing();
+ });
+
+ // Clean up the ghosted processor
+ getNifiClient().getProcessorClient().deleteProcessor(processor);
+
+ // Verify no NARs remain
+ assertEquals(0,
narUploadUtil.verifyNarSummaries(0).getNarSummaries().size());
+ }
+
+ logger.info("All {} cycles completed successfully", cycles);
+ }
+
+ /**
+ * Tests that the processor load state transitions correctly when
initialization is canceled.
+ */
+ @Test
Review Comment:
Most of the comments in this class do not add significant value and should
be removed.
##########
nifi-framework-bundle/nifi-framework/nifi-framework-nar-loading-utils/src/main/java/org/apache/nifi/nar/StandardNarLoader.java:
##########
@@ -126,6 +126,7 @@ public synchronized NarLoadResult load(final
Collection<File> narFiles, final Se
@Override
public synchronized void unload(final Collection<Bundle> bundles) {
+ LOGGER.debug("Starting unload of {} bundles", bundles.size());
Review Comment:
Recommend removing the debug logs from this method.
##########
nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/PythonProcess.java:
##########
@@ -377,17 +405,26 @@ private void installDebugPy() throws IOException {
final String command = String.join(" ", processBuilder.command());
logger.debug("Installing DebugPy to Virtual Env {} using command {}",
virtualEnvHome, command);
- final Process process = processBuilder.start();
+ final Process pipProcess = processBuilder.start();
- final int result;
try {
- result = process.waitFor();
- } catch (final InterruptedException e) {
- throw new IOException("Interrupted while waiting for DebugPy to be
installed");
- }
+ // Wait for pip install with periodic checks for shutdown
+ while (!pipProcess.waitFor(1, TimeUnit.SECONDS)) {
+ if (isShutdown()) {
+ logger.info("Interrupting DebugPy installation for {} due
to shutdown", componentId);
+ pipProcess.destroyForcibly();
+ return;
+ }
+ }
- if (result != 0) {
- throw new IOException("Failed to install DebugPy for Python
Environment " + virtualEnvHome + ": process existed with code " + result);
+ final int result = pipProcess.exitValue();
+ if (result != 0) {
+ throw new IOException("Failed to install DebugPy for Python
Environment " + virtualEnvHome + ": process existed with code " + result);
+ }
+ } catch (final InterruptedException e) {
+ pipProcess.destroyForcibly();
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted while waiting for DebugPy to be
installed", e);
Review Comment:
Some identification should be included in the message
##########
nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/PythonProcess.java:
##########
@@ -346,17 +352,33 @@ private void setupEnvironment() throws IOException {
final String command = String.join(" ", processBuilder.command());
logger.debug("Creating Python Virtual Environment {} using command
{}", virtualEnvHome, command);
- final Process process = processBuilder.start();
+ final Process venvProcess = processBuilder.start();
- final int result;
try {
- result = process.waitFor();
+ // Wait for the venv creation with periodic checks for shutdown
+ // This allows the venv creation to be interrupted when the
process is being shut down
+ while (!venvProcess.waitFor(1, TimeUnit.SECONDS)) {
+ if (isShutdown()) {
+ logger.info("Interrupting Python Virtual Environment
creation for {} due to shutdown", componentId);
+ venvProcess.destroyForcibly();
+ throw new IOException("Python process %s shutdown during
virtual environment creation".formatted(componentId));
+ }
+ }
+
+ final int result = venvProcess.exitValue();
+ if (result != 0) {
+ throw new IOException("Failed to create Python Environment " +
virtualEnvHome + ": process existed with code " + result);
+ }
} catch (final InterruptedException e) {
- throw new IOException("Interrupted while waiting for Python
virtual environment to be created");
+ // If interrupted, destroy the venv creation process and propagate
+ venvProcess.destroyForcibly();
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted while waiting for Python
virtual environment to be created", e);
Review Comment:
Recommend including home path for environment.
##########
nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/PythonProcess.java:
##########
@@ -346,17 +352,33 @@ private void setupEnvironment() throws IOException {
final String command = String.join(" ", processBuilder.command());
logger.debug("Creating Python Virtual Environment {} using command
{}", virtualEnvHome, command);
- final Process process = processBuilder.start();
+ final Process venvProcess = processBuilder.start();
- final int result;
try {
- result = process.waitFor();
+ // Wait for the venv creation with periodic checks for shutdown
+ // This allows the venv creation to be interrupted when the
process is being shut down
+ while (!venvProcess.waitFor(1, TimeUnit.SECONDS)) {
+ if (isShutdown()) {
+ logger.info("Interrupting Python Virtual Environment
creation for {} due to shutdown", componentId);
Review Comment:
Log message should be removed since an exception is being thrown
##########
nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/PythonProcess.java:
##########
@@ -346,17 +352,33 @@ private void setupEnvironment() throws IOException {
final String command = String.join(" ", processBuilder.command());
logger.debug("Creating Python Virtual Environment {} using command
{}", virtualEnvHome, command);
- final Process process = processBuilder.start();
+ final Process venvProcess = processBuilder.start();
- final int result;
try {
- result = process.waitFor();
+ // Wait for the venv creation with periodic checks for shutdown
+ // This allows the venv creation to be interrupted when the
process is being shut down
+ while (!venvProcess.waitFor(1, TimeUnit.SECONDS)) {
+ if (isShutdown()) {
+ logger.info("Interrupting Python Virtual Environment
creation for {} due to shutdown", componentId);
+ venvProcess.destroyForcibly();
+ throw new IOException("Python process %s shutdown during
virtual environment creation".formatted(componentId));
+ }
+ }
+
+ final int result = venvProcess.exitValue();
+ if (result != 0) {
+ throw new IOException("Failed to create Python Environment " +
virtualEnvHome + ": process existed with code " + result);
+ }
} catch (final InterruptedException e) {
- throw new IOException("Interrupted while waiting for Python
virtual environment to be created");
+ // If interrupted, destroy the venv creation process and propagate
Review Comment:
```suggestion
```
##########
nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/PythonProcess.java:
##########
@@ -369,6 +391,12 @@ private void setupEnvironment() throws IOException {
}
private void installDebugPy() throws IOException {
+ // Check if we've been shutdown before starting
Review Comment:
```suggestion
```
##########
nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/PythonProcess.java:
##########
@@ -529,6 +569,26 @@ public int getProcessorCount() {
return processorPrefersIsolation.size();
}
+ /**
+ * Returns true if this process is being started for the given component
identifier.
+ * This is used to identify processes that are still in the startup phase
(venv creation)
+ * and haven't yet created any processors.
+ *
+ * @param identifier the component identifier
+ * @return true if this process is starting for the given identifier
+ */
+ public boolean isStartingFor(final String identifier) {
+ return identifier != null && identifier.equals(componentId) &&
getProcessorCount() == 0;
Review Comment:
Is `identifier` allowed to be `null`? It seems like it should be required
with `Objects.requireNonNull()`
##########
nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/PythonProcess.java:
##########
@@ -377,17 +405,26 @@ private void installDebugPy() throws IOException {
final String command = String.join(" ", processBuilder.command());
logger.debug("Installing DebugPy to Virtual Env {} using command {}",
virtualEnvHome, command);
- final Process process = processBuilder.start();
+ final Process pipProcess = processBuilder.start();
- final int result;
try {
- result = process.waitFor();
- } catch (final InterruptedException e) {
- throw new IOException("Interrupted while waiting for DebugPy to be
installed");
- }
+ // Wait for pip install with periodic checks for shutdown
Review Comment:
```suggestion
```
##########
nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/PythonProcess.java:
##########
@@ -377,17 +405,26 @@ private void installDebugPy() throws IOException {
final String command = String.join(" ", processBuilder.command());
logger.debug("Installing DebugPy to Virtual Env {} using command {}",
virtualEnvHome, command);
- final Process process = processBuilder.start();
+ final Process pipProcess = processBuilder.start();
- final int result;
try {
- result = process.waitFor();
- } catch (final InterruptedException e) {
- throw new IOException("Interrupted while waiting for DebugPy to be
installed");
- }
+ // Wait for pip install with periodic checks for shutdown
+ while (!pipProcess.waitFor(1, TimeUnit.SECONDS)) {
+ if (isShutdown()) {
+ logger.info("Interrupting DebugPy installation for {} due
to shutdown", componentId);
+ pipProcess.destroyForcibly();
+ return;
+ }
+ }
- if (result != 0) {
- throw new IOException("Failed to install DebugPy for Python
Environment " + virtualEnvHome + ": process existed with code " + result);
+ final int result = pipProcess.exitValue();
+ if (result != 0) {
+ throw new IOException("Failed to install DebugPy for Python
Environment " + virtualEnvHome + ": process existed with code " + result);
Review Comment:
Recommend using .formatted()
##########
nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/PythonProcess.java:
##########
@@ -442,9 +479,12 @@ private synchronized void killProcess() {
}
public void discoverExtensions(final List<String> directories, final
String workDirectory) {
+ logger.info("discoverExtensions called with {} directories",
directories.size());
extensionDirs = new ArrayList<>(directories);
workDir = workDirectory;
+ logger.info("Calling controller.discoverExtensions");
controller.discoverExtensions(directories, workDirectory);
+ logger.info("controller.discoverExtensions completed");
Review Comment:
These info messages seem unnecessary
##########
nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonBridge.java:
##########
@@ -257,20 +284,36 @@ private synchronized PythonProcess
getProcessForNextComponent(final ExtensionId
}
final PythonProcess pythonProcess = new
PythonProcess(processConfig, serviceTypeLookup, envHome,
packagedWithDependencies, extensionId.type(), componentId);
- pythonProcess.start();
- // Create list of extensions directories, including NAR
directories
- final List<String> extensionsDirs =
processConfig.getPythonExtensionsDirectories().stream()
- .map(File::getAbsolutePath)
- .collect(Collectors.toCollection(ArrayList::new));
- extensionsDirs.addAll(getNarDirectories());
-
- final String workDirPath =
processConfig.getPythonWorkingDirectory().getAbsolutePath();
- pythonProcess.discoverExtensions(extensionsDirs, workDirPath);
-
- // Add the newly create process to the processes for the given
type of processor.
+ // Add the process to the list BEFORE calling start() so that
if a component removal
+ // is requested during startup (e.g., NAR deletion during venv
creation), we can find
+ // the process and shut it down.
processesForType.add(pythonProcess);
+ logger.info("Added PythonProcess to list for component {}, now
calling start()", componentId);
+
+ try {
+ pythonProcess.start();
+ logger.info("PythonProcess.start() completed for component
{}", componentId);
+
+ // Create list of extensions directories, including NAR
directories
+ // (NAR directories were pre-computed before acquiring
this lock to avoid deadlock)
+ final List<String> extensionsDirs =
processConfig.getPythonExtensionsDirectories().stream()
+ .map(File::getAbsolutePath)
+ .collect(Collectors.toCollection(ArrayList::new));
+ extensionsDirs.addAll(narDirectories);
+
+ final String workDirPath =
processConfig.getPythonWorkingDirectory().getAbsolutePath();
+ logger.info("Calling discoverExtensions for component {}",
componentId);
+ pythonProcess.discoverExtensions(extensionsDirs,
workDirPath);
+ logger.info("discoverExtensions completed for component
{}", componentId);
+ } catch (final IOException e) {
+ // If start fails, remove the process from the list
Review Comment:
This comment is duplicative of the log message.
##########
nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonBridge.java:
##########
@@ -191,6 +206,18 @@ public synchronized void onProcessorRemoved(final String
identifier, final Strin
}
}
+ // If no processor was found to remove, check if any
process is still starting up
+ // for this identifier (e.g., during venv creation). If
so, shut it down.
+ if (toRemove == null) {
+ for (final PythonProcess process : processes) {
+ if (process.isStartingFor(identifier)) {
+ logger.info("Found Python Process that is
starting for identifier {}. Shutting down.", identifier);
Review Comment:
```suggestion
logger.info("Shutting down Python Process
for Processor [{}]", identifier);
```
##########
nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonBridge.java:
##########
@@ -119,25 +119,41 @@ public void discoverExtensions(final List<File>
extensionDirectories) {
}
private PythonProcessorBridge createProcessorBridge(final String
identifier, final String type, final String version, final boolean
preferIsolatedProcess) {
+ logger.info("createProcessorBridge called for identifier={}, type={},
version={}", identifier, type, version);
ensureStarted();
final Optional<ExtensionId> extensionIdFound = findExtensionId(type,
version);
final ExtensionId extensionId = extensionIdFound.orElseThrow(() -> new
IllegalArgumentException("Processor Type [%s] Version [%s] not
found".formatted(type, version)));
logger.debug("Creating Python Processor Type [{}] Version [{}]",
extensionId.type(), extensionId.version());
+ logger.info("Calling getProcessorTypes() for identifier={}",
identifier);
final PythonProcessorDetails processorDetails =
getProcessorTypes().stream()
.filter(details -> details.getProcessorType().equals(type))
.filter(details -> details.getProcessorVersion().equals(version))
.findFirst()
.orElseThrow(() -> new IllegalArgumentException("Could not find
Processor Details for Python Processor type [%s] or version
[%s]".formatted(type, version)));
+ logger.info("getProcessorTypes() completed for identifier={}",
identifier);
Review Comment:
These logs should be removed.
##########
nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonBridge.java:
##########
@@ -257,20 +284,36 @@ private synchronized PythonProcess
getProcessForNextComponent(final ExtensionId
}
final PythonProcess pythonProcess = new
PythonProcess(processConfig, serviceTypeLookup, envHome,
packagedWithDependencies, extensionId.type(), componentId);
- pythonProcess.start();
- // Create list of extensions directories, including NAR
directories
- final List<String> extensionsDirs =
processConfig.getPythonExtensionsDirectories().stream()
- .map(File::getAbsolutePath)
- .collect(Collectors.toCollection(ArrayList::new));
- extensionsDirs.addAll(getNarDirectories());
-
- final String workDirPath =
processConfig.getPythonWorkingDirectory().getAbsolutePath();
- pythonProcess.discoverExtensions(extensionsDirs, workDirPath);
-
- // Add the newly create process to the processes for the given
type of processor.
+ // Add the process to the list BEFORE calling start() so that
if a component removal
+ // is requested during startup (e.g., NAR deletion during venv
creation), we can find
+ // the process and shut it down.
processesForType.add(pythonProcess);
+ logger.info("Added PythonProcess to list for component {}, now
calling start()", componentId);
Review Comment:
This seem like a debug log
##########
nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-bridge/src/test/java/org/apache/nifi/py4j/StandardPythonProcessorBridgeTest.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.py4j;
+
+import org.apache.nifi.components.AsyncLoadedProcessor.LoadState;
+import org.apache.nifi.python.PythonController;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Unit tests for StandardPythonProcessorBridge cancellation functionality.
+ */
Review Comment:
```suggestion
```
##########
nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonProcessorBridge.java:
##########
@@ -65,6 +66,13 @@ public Optional<PythonProcessorAdapter>
getProcessorAdapter() {
@Override
public void initialize(final PythonProcessorInitializationContext context)
{
+ // If already canceled, do not start initialization
Review Comment:
Comment does not add much value beyond reading the code.
##########
nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonBridge.java:
##########
@@ -257,20 +284,36 @@ private synchronized PythonProcess
getProcessForNextComponent(final ExtensionId
}
final PythonProcess pythonProcess = new
PythonProcess(processConfig, serviceTypeLookup, envHome,
packagedWithDependencies, extensionId.type(), componentId);
- pythonProcess.start();
- // Create list of extensions directories, including NAR
directories
- final List<String> extensionsDirs =
processConfig.getPythonExtensionsDirectories().stream()
- .map(File::getAbsolutePath)
- .collect(Collectors.toCollection(ArrayList::new));
- extensionsDirs.addAll(getNarDirectories());
-
- final String workDirPath =
processConfig.getPythonWorkingDirectory().getAbsolutePath();
- pythonProcess.discoverExtensions(extensionsDirs, workDirPath);
-
- // Add the newly create process to the processes for the given
type of processor.
+ // Add the process to the list BEFORE calling start() so that
if a component removal
+ // is requested during startup (e.g., NAR deletion during venv
creation), we can find
+ // the process and shut it down.
processesForType.add(pythonProcess);
+ logger.info("Added PythonProcess to list for component {}, now
calling start()", componentId);
+
+ try {
+ pythonProcess.start();
+ logger.info("PythonProcess.start() completed for component
{}", componentId);
+
+ // Create list of extensions directories, including NAR
directories
+ // (NAR directories were pre-computed before acquiring
this lock to avoid deadlock)
+ final List<String> extensionsDirs =
processConfig.getPythonExtensionsDirectories().stream()
+ .map(File::getAbsolutePath)
+ .collect(Collectors.toCollection(ArrayList::new));
+ extensionsDirs.addAll(narDirectories);
+
+ final String workDirPath =
processConfig.getPythonWorkingDirectory().getAbsolutePath();
+ logger.info("Calling discoverExtensions for component {}",
componentId);
+ pythonProcess.discoverExtensions(extensionsDirs,
workDirPath);
+ logger.info("discoverExtensions completed for component
{}", componentId);
Review Comment:
These logs should be removed.
##########
nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonBridge.java:
##########
@@ -119,25 +119,41 @@ public void discoverExtensions(final List<File>
extensionDirectories) {
}
private PythonProcessorBridge createProcessorBridge(final String
identifier, final String type, final String version, final boolean
preferIsolatedProcess) {
+ logger.info("createProcessorBridge called for identifier={}, type={},
version={}", identifier, type, version);
ensureStarted();
final Optional<ExtensionId> extensionIdFound = findExtensionId(type,
version);
final ExtensionId extensionId = extensionIdFound.orElseThrow(() -> new
IllegalArgumentException("Processor Type [%s] Version [%s] not
found".formatted(type, version)));
logger.debug("Creating Python Processor Type [{}] Version [{}]",
extensionId.type(), extensionId.version());
+ logger.info("Calling getProcessorTypes() for identifier={}",
identifier);
final PythonProcessorDetails processorDetails =
getProcessorTypes().stream()
.filter(details -> details.getProcessorType().equals(type))
.filter(details -> details.getProcessorVersion().equals(version))
.findFirst()
.orElseThrow(() -> new IllegalArgumentException("Could not find
Processor Details for Python Processor type [%s] or version
[%s]".formatted(type, version)));
+ logger.info("getProcessorTypes() completed for identifier={}",
identifier);
final String processorHome = processorDetails.getExtensionHome();
final boolean bundledWithDependencies =
processorDetails.isBundledWithDependencies();
- final PythonProcess pythonProcess =
getProcessForNextComponent(extensionId, identifier, processorHome,
preferIsolatedProcess, bundledWithDependencies);
+ // Pre-compute NAR directories BEFORE acquiring the synchronized lock
in getProcessForNextComponent.
+ // This avoids a deadlock: getProcessForNextComponent is synchronized
on this object, and
+ // getNarDirectories calls extensionManager.getAllBundles() which is
synchronized on
+ // StandardExtensionDiscoveringManager. Meanwhile, removeBundles
(synchronized on
+ // StandardExtensionDiscoveringManager) can call onProcessorRemoved
(synchronized on this object).
+ final Set<String> narDirectories = getNarDirectories();
+
+ logger.info("Calling getProcessForNextComponent for identifier={}",
identifier);
+ final PythonProcess pythonProcess =
getProcessForNextComponent(extensionId, identifier, processorHome,
preferIsolatedProcess, bundledWithDependencies, narDirectories);
+ logger.info("getProcessForNextComponent completed for identifier={}",
identifier);
+
final String workDirPath =
processConfig.getPythonWorkingDirectory().getAbsolutePath();
+ logger.info("Calling pythonProcess.createProcessor for identifier={}",
identifier);
final PythonProcessorBridge processorBridge =
pythonProcess.createProcessor(identifier, type, version, workDirPath,
preferIsolatedProcess);
+ logger.info("pythonProcess.createProcessor completed for
identifier={}", identifier);
+
Review Comment:
Extra logging should be removed.
##########
nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/StandardExtensionDiscoveringManager.java:
##########
@@ -727,17 +727,22 @@ public synchronized Bundle getBundle(final
BundleCoordinate bundleCoordinate) {
@Override
public synchronized Set<Bundle> removeBundles(final
Collection<BundleCoordinate> bundleCoordinates) {
+ logger.info("removeBundles called for {} coordinates, acquiring lock",
bundleCoordinates.size());
Review Comment:
Recommend removing additional logs
##########
nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/python/PythonNarDeletionDuringInitIT.java:
##########
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.tests.system.python;
+
+import org.apache.nifi.nar.NarState;
+import org.apache.nifi.tests.system.NiFiInstanceFactory;
+import org.apache.nifi.tests.system.NiFiSystemIT;
+import org.apache.nifi.tests.system.nar.NarUploadUtil;
+import org.apache.nifi.toolkit.client.NiFiClientException;
+import org.apache.nifi.web.api.dto.BundleDTO;
+import org.apache.nifi.web.api.dto.DocumentedTypeDTO;
+import org.apache.nifi.web.api.dto.NarSummaryDTO;
+import org.apache.nifi.web.api.entity.ProcessorEntity;
+import org.apache.nifi.web.api.entity.ProcessorTypesEntity;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Integration tests for Python NAR deletion during processor initialization.
+ * These tests verify that:
+ * 1. Force-deleting a NAR while a Python processor is initializing does not
block indefinitely
+ * 2. The processor initialization is properly canceled when the NAR is deleted
+ * 3. The system remains in a consistent state after the operation
+ */
+public class PythonNarDeletionDuringInitIT extends NiFiSystemIT {
+
+ private static final Logger logger =
LoggerFactory.getLogger(PythonNarDeletionDuringInitIT.class);
+
+ private static final String PYTHON_TEXT_EXTENSIONS_NAR_ID =
"nifi-python-test-extensions-nar";
+ private static final String PYTHON_WRITE_BECH_32_CHARSET =
"WriteBech32Charset";
+
+ // Maximum time to wait for NAR deletion - should complete quickly with
interruptibility fix
+ private static final int NAR_DELETE_TIMEOUT_SECONDS = 30;
+
+ @Override
+ public NiFiInstanceFactory getInstanceFactory() {
+ return createPythonicInstanceFactory();
+ }
+
+
+ /**
+ * Tests that force-deleting a NAR immediately after creating a processor
+ * does not block indefinitely. The deletion should complete within a
reasonable time
+ * even if the processor is still initializing (creating venv, downloading
dependencies, etc.).
+ */
+ @Test
+ @Timeout(value = 120, unit = TimeUnit.SECONDS)
+ public void testForceDeleteNarDuringProcessorInitialization() throws
NiFiClientException, IOException, InterruptedException {
+ final NarUploadUtil narUploadUtil = new NarUploadUtil(getNifiClient());
+
+ // Get the Python NAR file
+ final File pythonTestExtensionsNar = getPythonTestExtensionsNar();
+ assertNotNull(pythonTestExtensionsNar, "Python test extensions NAR not
found");
+
+ // Upload the NAR and wait for it to be installed
+ final NarSummaryDTO uploadedNarSummary =
narUploadUtil.uploadNar(pythonTestExtensionsNar);
+
waitFor(narUploadUtil.getWaitForNarStateSupplier(uploadedNarSummary.getIdentifier(),
NarState.INSTALLED));
+ logger.info("NAR uploaded and installed: {}",
uploadedNarSummary.getIdentifier());
+
+ // Get the processor type info
+ final DocumentedTypeDTO processorTypeDTO =
getDocumentedTypeDTO(PYTHON_WRITE_BECH_32_CHARSET);
+ assertNotNull(processorTypeDTO, "Processor type not found after NAR
installation");
+ final BundleDTO processorBundle = processorTypeDTO.getBundle();
+
+ // Create a Python processor - this triggers async initialization
+ final ProcessorEntity pythonProcessor =
getClientUtil().createProcessor(
+ PYTHON_WRITE_BECH_32_CHARSET,
+ processorBundle.getGroup(),
+ processorBundle.getArtifact(),
+ processorBundle.getVersion());
+ logger.info("Created processor: {}", pythonProcessor.getId());
+
+ // Immediately force-delete the NAR while the processor is still
initializing
+ // This should NOT block indefinitely - the initialization should be
interruptible
+ logger.info("Force-deleting NAR while processor is initializing...");
+
+ final CompletableFuture<NarSummaryDTO> deleteFuture =
CompletableFuture.supplyAsync(() -> {
+ try {
+ return narUploadUtil.deleteNar(uploadedNarSummary);
+ } catch (final Exception e) {
+ throw new RuntimeException("Failed to delete NAR", e);
+ }
+ });
+
+ // The deletion should complete within the timeout - this is the key
assertion
+ // Before the fix, this would block indefinitely waiting for venv
creation to complete
+ assertDoesNotThrow(() -> {
+ try {
+ final NarSummaryDTO deletedNar =
deleteFuture.get(NAR_DELETE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ logger.info("NAR deleted successfully: {}",
deletedNar.getIdentifier());
+ } catch (final TimeoutException e) {
+ throw new AssertionError(
+ "NAR deletion blocked for more than " +
NAR_DELETE_TIMEOUT_SECONDS + " seconds. " +
+ "This indicates the initialization was not properly
interrupted.", e);
+ } catch (final ExecutionException e) {
+ throw new RuntimeException("NAR deletion failed with
exception", e.getCause());
+ }
+ }, "NAR deletion should complete within " + NAR_DELETE_TIMEOUT_SECONDS
+ " seconds");
+
+ // Verify the NAR is gone
+ narUploadUtil.verifyNarSummaries(0);
+
+ // Verify the processor type is no longer available
+ assertNull(getDocumentedTypeDTO(PYTHON_WRITE_BECH_32_CHARSET),
+ "Processor type should not be available after NAR deletion");
+
+ // Verify the processor is now ghosted (extension missing)
+ final String pythonProcessorId = pythonProcessor.getId();
+ waitFor(() -> {
+ final ProcessorEntity processorAfterDelete =
getNifiClient().getProcessorClient().getProcessor(pythonProcessorId);
+ logger.info("Waiting for processor {} to be considered missing",
pythonProcessorId);
+ return processorAfterDelete.getComponent().getExtensionMissing();
+ });
+
+ logger.info("Test completed successfully - NAR deletion did not block
during processor initialization");
+ }
+
+ /**
+ * Tests that after a NAR is force-deleted during initialization and then
re-uploaded,
+ * a new processor can be created and functions correctly.
+ */
+ @Test
+ @Timeout(value = 180, unit = TimeUnit.SECONDS)
+ public void testNarReuploadAfterForceDeleteDuringInit() throws
NiFiClientException, IOException, InterruptedException {
+ final NarUploadUtil narUploadUtil = new NarUploadUtil(getNifiClient());
+
+ // Get the Python NAR file
+ final File pythonTestExtensionsNar = getPythonTestExtensionsNar();
+ assertNotNull(pythonTestExtensionsNar);
+
+ // Phase 1: Upload NAR, create processor, force-delete NAR during init
+ logger.info("Phase 1: Upload NAR, create processor, force-delete
during init");
+
+ final NarSummaryDTO uploadedNarSummary =
narUploadUtil.uploadNar(pythonTestExtensionsNar);
+
waitFor(narUploadUtil.getWaitForNarStateSupplier(uploadedNarSummary.getIdentifier(),
NarState.INSTALLED));
+
+ final DocumentedTypeDTO processorTypeDTO =
getDocumentedTypeDTO(PYTHON_WRITE_BECH_32_CHARSET);
+ final BundleDTO processorBundle = processorTypeDTO.getBundle();
+
+ // Create processor (triggers initialization)
+ final ProcessorEntity firstProcessor = getClientUtil().createProcessor(
+ PYTHON_WRITE_BECH_32_CHARSET,
+ processorBundle.getGroup(),
+ processorBundle.getArtifact(),
+ processorBundle.getVersion());
+
+ // Immediately force-delete NAR
+ final CompletableFuture<NarSummaryDTO> deleteFuture =
CompletableFuture.supplyAsync(() -> {
+ try {
+ return narUploadUtil.deleteNar(uploadedNarSummary);
+ } catch (final Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ // Wait for deletion (should not timeout)
+ try {
+ deleteFuture.get(NAR_DELETE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ } catch (final TimeoutException | ExecutionException e) {
+ throw new AssertionError("NAR deletion should not block during
processor initialization", e);
+ }
+
+ // Wait for the first processor to become ghosted
+ final String firstProcessorId = firstProcessor.getId();
+ waitFor(() -> {
+ final ProcessorEntity proc =
getNifiClient().getProcessorClient().getProcessor(firstProcessorId);
+ return proc.getComponent().getExtensionMissing();
+ });
+
+ // Delete the ghosted processor to clean up
+ getNifiClient().getProcessorClient().deleteProcessor(firstProcessor);
+
+ // Phase 2: Re-upload NAR and verify new processor works
+ logger.info("Phase 2: Re-upload NAR and verify new processor works");
+
+ final NarSummaryDTO reuploadedNarSummary =
narUploadUtil.uploadNar(pythonTestExtensionsNar);
+
waitFor(narUploadUtil.getWaitForNarStateSupplier(reuploadedNarSummary.getIdentifier(),
NarState.INSTALLED));
+
+ // Verify processor type is available again
+ final DocumentedTypeDTO reloadedProcessorType =
getDocumentedTypeDTO(PYTHON_WRITE_BECH_32_CHARSET);
+ assertNotNull(reloadedProcessorType, "Processor type should be
available after NAR re-upload");
+
+ // Create a new processor and verify it initializes successfully
+ final ProcessorEntity secondProcessor =
getClientUtil().createProcessor(
+ PYTHON_WRITE_BECH_32_CHARSET,
+ reloadedProcessorType.getBundle().getGroup(),
+ reloadedProcessorType.getBundle().getArtifact(),
+ reloadedProcessorType.getBundle().getVersion());
+
+ // Wait for the processor to be loaded (not ghosted) - we don't need
it to be fully valid
+ // since that would require wiring up connections
+ final String secondProcessorId = secondProcessor.getId();
+ waitFor(() -> {
+ final ProcessorEntity proc =
getNifiClient().getProcessorClient().getProcessor(secondProcessorId);
+ // Processor is loaded when it's not marked as extension missing
+ return !proc.getComponent().getExtensionMissing();
+ });
+
+ // Verify the processor is not ghosted
+ final ProcessorEntity loadedProcessor =
getNifiClient().getProcessorClient().getProcessor(secondProcessorId);
+ assertFalse(loadedProcessor.getComponent().getExtensionMissing(),
+ "Processor should not be marked as extension missing after NAR
re-upload");
+
+ // Clean up
+ getNifiClient().getProcessorClient().deleteProcessor(loadedProcessor);
+ narUploadUtil.deleteNar(reuploadedNarSummary);
+
+ logger.info("Test completed successfully - NAR re-upload after
force-delete works correctly");
+ }
+
+ /**
+ * Tests rapid NAR upload/delete cycles to verify the system remains stable
+ * and doesn't accumulate orphaned resources.
+ */
+ @Test
+ @Timeout(value = 240, unit = TimeUnit.SECONDS)
+ public void testRapidNarUploadDeleteCycles() throws NiFiClientException,
IOException, InterruptedException {
+ final NarUploadUtil narUploadUtil = new NarUploadUtil(getNifiClient());
+ final File pythonTestExtensionsNar = getPythonTestExtensionsNar();
+ assertNotNull(pythonTestExtensionsNar);
+
+ final int cycles = 3;
+ logger.info("Running {} rapid NAR upload/delete cycles", cycles);
+
+ for (int i = 0; i < cycles; i++) {
+ logger.info("Cycle {} of {}", i + 1, cycles);
Review Comment:
Logs should be removed from this test class
##########
nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-bridge/src/test/java/org/apache/nifi/py4j/StandardPythonProcessorBridgeTest.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.py4j;
+
+import org.apache.nifi.components.AsyncLoadedProcessor.LoadState;
+import org.apache.nifi.python.PythonController;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Unit tests for StandardPythonProcessorBridge cancellation functionality.
+ */
+class StandardPythonProcessorBridgeTest {
+
+ @Mock
+ private PythonController controller;
+
+ @Mock
+ private ProcessorCreationWorkflow creationWorkflow;
+
+ @TempDir
+ private Path tempDir;
+
+ private StandardPythonProcessorBridge bridge;
+
+ @BeforeEach
+ void setUp() throws IOException {
+ MockitoAnnotations.openMocks(this);
+
+ // Create a temporary module file
+ final File moduleFile =
Files.createFile(tempDir.resolve("test_processor.py")).toFile();
+ final File workDir =
Files.createDirectory(tempDir.resolve("work")).toFile();
+
+ bridge = new StandardPythonProcessorBridge.Builder()
+ .controller(controller)
+ .creationWorkflow(creationWorkflow)
+ .processorType("TestProcessor")
+ .processorVersion("1.0.0")
+ .workingDirectory(workDir)
+ .moduleFile(moduleFile)
+ .build();
+ }
+
+ @Test
+ void testCancelSetsFlag() {
+ assertFalse(bridge.isCanceled(), "Bridge should not be canceled
initially");
+
+ bridge.cancel();
+
+ assertTrue(bridge.isCanceled(), "Bridge should be canceled after
cancel() is called");
+ }
+
+ @Test
+ void testCancelSetsLoadStateToCanceled() {
+ // Initially should be in some non-canceled state
+ LoadState initialState = bridge.getLoadState();
+ assertFalse(initialState == LoadState.CANCELED, "Initial state should
not be CANCELED");
+
+ bridge.cancel();
+
+ assertEquals(LoadState.CANCELED, bridge.getLoadState(),
+ "Load state should be CANCELED after cancel() is called");
+ }
+
+ @Test
+ void testCancelIsIdempotent() {
+ bridge.cancel();
+ assertTrue(bridge.isCanceled());
+ assertEquals(LoadState.CANCELED, bridge.getLoadState());
+
+ // Call cancel again - should not throw or change state
+ bridge.cancel();
+ assertTrue(bridge.isCanceled());
+ assertEquals(LoadState.CANCELED, bridge.getLoadState());
+
+ // And again
+ bridge.cancel();
+ assertTrue(bridge.isCanceled());
+ assertEquals(LoadState.CANCELED, bridge.getLoadState());
+ }
+
+ @Test
+ void testIsCanceledReturnsFalseInitially() {
+ assertFalse(bridge.isCanceled(), "isCanceled() should return false
before cancel() is called");
+ }
+
+ @Test
+ void testIsCanceledReturnsTrueAfterCancel() {
+ bridge.cancel();
+ assertTrue(bridge.isCanceled(), "isCanceled() should return true after
cancel() is called");
+ }
+
+ @Test
+ @Timeout(value = 5, unit = TimeUnit.SECONDS)
+ void testCancelFromDifferentThread() throws InterruptedException {
+ final CountDownLatch cancelLatch = new CountDownLatch(1);
+ final AtomicBoolean canceledFromThread = new AtomicBoolean(false);
+
+ Thread cancelThread = new Thread(() -> {
+ bridge.cancel();
+ canceledFromThread.set(bridge.isCanceled());
+ cancelLatch.countDown();
+ });
+
+ cancelThread.start();
+ assertTrue(cancelLatch.await(5, TimeUnit.SECONDS), "Cancel should
complete within timeout");
+ assertTrue(canceledFromThread.get(), "Bridge should be canceled from
another thread");
+ assertTrue(bridge.isCanceled(), "Bridge should be canceled as seen
from main thread");
+ }
+
+ @Test
+ void testLoadStateTransitionsToCanceledOnCancel() {
+ // Get initial state
+ LoadState beforeCancel = bridge.getLoadState();
+
+ // Cancel
+ bridge.cancel();
+
+ // Verify state changed to CANCELED
+ LoadState afterCancel = bridge.getLoadState();
+ assertEquals(LoadState.CANCELED, afterCancel,
+ "Load state should transition to CANCELED, was: " +
beforeCancel);
Review Comment:
Messages for equality checks rarely add value, this one should be removed
and others should be removed.
##########
nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonBridge.java:
##########
@@ -257,20 +284,36 @@ private synchronized PythonProcess
getProcessForNextComponent(final ExtensionId
}
final PythonProcess pythonProcess = new
PythonProcess(processConfig, serviceTypeLookup, envHome,
packagedWithDependencies, extensionId.type(), componentId);
- pythonProcess.start();
- // Create list of extensions directories, including NAR
directories
- final List<String> extensionsDirs =
processConfig.getPythonExtensionsDirectories().stream()
- .map(File::getAbsolutePath)
- .collect(Collectors.toCollection(ArrayList::new));
- extensionsDirs.addAll(getNarDirectories());
-
- final String workDirPath =
processConfig.getPythonWorkingDirectory().getAbsolutePath();
- pythonProcess.discoverExtensions(extensionsDirs, workDirPath);
-
- // Add the newly create process to the processes for the given
type of processor.
+ // Add the process to the list BEFORE calling start() so that
if a component removal
+ // is requested during startup (e.g., NAR deletion during venv
creation), we can find
+ // the process and shut it down.
processesForType.add(pythonProcess);
+ logger.info("Added PythonProcess to list for component {}, now
calling start()", componentId);
+
+ try {
+ pythonProcess.start();
+ logger.info("PythonProcess.start() completed for component
{}", componentId);
+
+ // Create list of extensions directories, including NAR
directories
+ // (NAR directories were pre-computed before acquiring
this lock to avoid deadlock)
+ final List<String> extensionsDirs =
processConfig.getPythonExtensionsDirectories().stream()
+ .map(File::getAbsolutePath)
+ .collect(Collectors.toCollection(ArrayList::new));
+ extensionsDirs.addAll(narDirectories);
+
+ final String workDirPath =
processConfig.getPythonWorkingDirectory().getAbsolutePath();
+ logger.info("Calling discoverExtensions for component {}",
componentId);
+ pythonProcess.discoverExtensions(extensionsDirs,
workDirPath);
+ logger.info("discoverExtensions completed for component
{}", componentId);
+ } catch (final IOException e) {
+ // If start fails, remove the process from the list
+ logger.info("PythonProcess.start() or discoverExtensions
failed for component {}, removing from list", componentId);
+ processesForType.remove(pythonProcess);
+ throw e;
+ }
+ logger.info("Returning PythonProcess for component {}",
componentId);
Review Comment:
This should be removed.
##########
nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-bridge/src/test/java/org/apache/nifi/py4j/PythonProcessTest.java:
##########
@@ -1,131 +1,185 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.py4j;
-
-import org.apache.nifi.python.ControllerServiceTypeLookup;
-import org.apache.nifi.python.PythonProcessConfig;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.junit.jupiter.api.io.CleanupMode;
-import org.junit.jupiter.api.io.TempDir;
-import org.mockito.Mock;
-import org.mockito.junit.jupiter.MockitoExtension;
-
-import java.io.File;
-import java.io.IOException;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.when;
-
-@ExtendWith(MockitoExtension.class)
-class PythonProcessTest {
-
- private static final String UNIX_BIN_DIR = "bin";
-
- private static final String WINDOWS_SCRIPTS_DIR = "Scripts";
-
- private static final String PYTHON_CMD = "python";
-
- private PythonProcess pythonProcess;
-
- @TempDir(cleanup = CleanupMode.ON_SUCCESS)
- private File virtualEnvHome;
-
- @Mock
- private PythonProcessConfig pythonProcessConfig;
-
- @Mock
- private ControllerServiceTypeLookup controllerServiceTypeLookup;
-
- @BeforeEach
- public void setUp() {
- this.pythonProcess = new PythonProcess(this.pythonProcessConfig,
this.controllerServiceTypeLookup, virtualEnvHome, false, "Controller",
"Controller");
- }
-
- @Test
- void testUsesConfiguredValueWhenPackagedWithDependencies() throws
IOException {
- when(pythonProcessConfig.getPythonCommand()).thenReturn(PYTHON_CMD);
- final PythonProcess process = new
PythonProcess(this.pythonProcessConfig, this.controllerServiceTypeLookup,
virtualEnvHome, true, "Controller", "Controller");
- assertEquals(PYTHON_CMD, process.resolvePythonCommand());
- }
-
- @Test
- void testResolvePythonCommandWindows() throws IOException {
- final File scriptsDir = new File(virtualEnvHome, WINDOWS_SCRIPTS_DIR);
- assertTrue(scriptsDir.mkdir());
-
- when(pythonProcessConfig.getPythonCommand()).thenReturn(PYTHON_CMD);
- final String result = this.pythonProcess.resolvePythonCommand();
-
- final String expected = getExpectedBinaryPath(WINDOWS_SCRIPTS_DIR);
- assertEquals(expected, result);
- }
-
- @Test
- void testResolvePythonCommandUnix() throws IOException {
- final File binDir = new File(virtualEnvHome, UNIX_BIN_DIR);
- assertTrue(binDir.mkdir());
-
- when(pythonProcessConfig.getPythonCommand()).thenReturn(PYTHON_CMD);
- final String result = this.pythonProcess.resolvePythonCommand();
-
- final String expected = getExpectedBinaryPath(UNIX_BIN_DIR);
- assertEquals(expected, result);
- }
-
- @Test
- void testResolvePythonCommandFindCommand() throws IOException {
- final File binDir = new File(virtualEnvHome, UNIX_BIN_DIR);
- assertTrue(binDir.mkdir());
- final File scriptsDir = new File(virtualEnvHome, WINDOWS_SCRIPTS_DIR);
- assertTrue(scriptsDir.mkdir());
-
- final File fakeWindowsPythonExe = new File(scriptsDir, PYTHON_CMD +
".exe");
- assertTrue(fakeWindowsPythonExe.createNewFile());
-
- when(pythonProcessConfig.getPythonCommand()).thenReturn(PYTHON_CMD);
- final String result = this.pythonProcess.resolvePythonCommand();
-
- final String expected = getExpectedBinaryPath(WINDOWS_SCRIPTS_DIR);
- assertEquals(expected, result);
- }
-
- @Test
- void testResolvePythonCommandFindCommandMissingPythonCmd() throws
IOException {
- final File binDir = new File(virtualEnvHome, UNIX_BIN_DIR);
- assertTrue(binDir.mkdir());
- final File scriptsDir = new File(virtualEnvHome, WINDOWS_SCRIPTS_DIR);
- assertTrue(scriptsDir.mkdir());
-
- when(pythonProcessConfig.getPythonCommand()).thenReturn(PYTHON_CMD);
-
- assertThrows(IOException.class, () ->
this.pythonProcess.resolvePythonCommand());
- }
-
- @Test
- void testResolvePythonCommandNone() {
- when(pythonProcessConfig.getPythonCommand()).thenReturn(PYTHON_CMD);
- assertThrows(IOException.class, () ->
this.pythonProcess.resolvePythonCommand());
- }
-
- private String getExpectedBinaryPath(String binarySubDirectoryName) {
- return this.virtualEnvHome.getAbsolutePath() + File.separator +
binarySubDirectoryName + File.separator + PYTHON_CMD;
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.py4j;
+
+import org.apache.nifi.python.ControllerServiceTypeLookup;
+import org.apache.nifi.python.PythonProcessConfig;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.CleanupMode;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class PythonProcessTest {
+
+ private static final String UNIX_BIN_DIR = "bin";
+
+ private static final String WINDOWS_SCRIPTS_DIR = "Scripts";
+
+ private static final String PYTHON_CMD = "python";
+
+ private PythonProcess pythonProcess;
+
+ @TempDir(cleanup = CleanupMode.ON_SUCCESS)
+ private File virtualEnvHome;
+
+ @Mock
+ private PythonProcessConfig pythonProcessConfig;
+
+ @Mock
+ private ControllerServiceTypeLookup controllerServiceTypeLookup;
+
+ @BeforeEach
+ public void setUp() {
+ this.pythonProcess = new PythonProcess(this.pythonProcessConfig,
this.controllerServiceTypeLookup, virtualEnvHome, false, "Controller",
"Controller");
+ }
+
+ @Test
+ void testUsesConfiguredValueWhenPackagedWithDependencies() throws
IOException {
+ when(pythonProcessConfig.getPythonCommand()).thenReturn(PYTHON_CMD);
+ final PythonProcess process = new
PythonProcess(this.pythonProcessConfig, this.controllerServiceTypeLookup,
virtualEnvHome, true, "Controller", "Controller");
+ assertEquals(PYTHON_CMD, process.resolvePythonCommand());
+ }
+
+ @Test
+ void testResolvePythonCommandWindows() throws IOException {
+ final File scriptsDir = new File(virtualEnvHome, WINDOWS_SCRIPTS_DIR);
+ assertTrue(scriptsDir.mkdir());
+
+ when(pythonProcessConfig.getPythonCommand()).thenReturn(PYTHON_CMD);
+ final String result = this.pythonProcess.resolvePythonCommand();
+
+ final String expected = getExpectedBinaryPath(WINDOWS_SCRIPTS_DIR);
+ assertEquals(expected, result);
+ }
+
+ @Test
+ void testResolvePythonCommandUnix() throws IOException {
+ final File binDir = new File(virtualEnvHome, UNIX_BIN_DIR);
+ assertTrue(binDir.mkdir());
+
+ when(pythonProcessConfig.getPythonCommand()).thenReturn(PYTHON_CMD);
+ final String result = this.pythonProcess.resolvePythonCommand();
+
+ final String expected = getExpectedBinaryPath(UNIX_BIN_DIR);
+ assertEquals(expected, result);
+ }
+
+ @Test
+ void testResolvePythonCommandFindCommand() throws IOException {
+ final File binDir = new File(virtualEnvHome, UNIX_BIN_DIR);
+ assertTrue(binDir.mkdir());
+ final File scriptsDir = new File(virtualEnvHome, WINDOWS_SCRIPTS_DIR);
+ assertTrue(scriptsDir.mkdir());
+
+ final File fakeWindowsPythonExe = new File(scriptsDir, PYTHON_CMD +
".exe");
+ assertTrue(fakeWindowsPythonExe.createNewFile());
+
+ when(pythonProcessConfig.getPythonCommand()).thenReturn(PYTHON_CMD);
+ final String result = this.pythonProcess.resolvePythonCommand();
+
+ final String expected = getExpectedBinaryPath(WINDOWS_SCRIPTS_DIR);
+ assertEquals(expected, result);
+ }
+
+ @Test
+ void testResolvePythonCommandFindCommandMissingPythonCmd() throws
IOException {
+ final File binDir = new File(virtualEnvHome, UNIX_BIN_DIR);
+ assertTrue(binDir.mkdir());
+ final File scriptsDir = new File(virtualEnvHome, WINDOWS_SCRIPTS_DIR);
+ assertTrue(scriptsDir.mkdir());
+
+ when(pythonProcessConfig.getPythonCommand()).thenReturn(PYTHON_CMD);
+
+ assertThrows(IOException.class, () ->
this.pythonProcess.resolvePythonCommand());
+ }
+
+ @Test
+ void testResolvePythonCommandNone() {
+ when(pythonProcessConfig.getPythonCommand()).thenReturn(PYTHON_CMD);
+ assertThrows(IOException.class, () ->
this.pythonProcess.resolvePythonCommand());
+ }
+
+ private String getExpectedBinaryPath(String binarySubDirectoryName) {
+ return this.virtualEnvHome.getAbsolutePath() + File.separator +
binarySubDirectoryName + File.separator + PYTHON_CMD;
+ }
+
+ /**
+ * Tests that the PythonProcess can be shutdown even if it hasn't been
started.
+ */
Review Comment:
The method comments in this class do not add much value and should be
removed.
##########
nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-bridge/src/test/java/org/apache/nifi/py4j/StandardPythonProcessorBridgeTest.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.py4j;
+
+import org.apache.nifi.components.AsyncLoadedProcessor.LoadState;
+import org.apache.nifi.python.PythonController;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Unit tests for StandardPythonProcessorBridge cancellation functionality.
+ */
+class StandardPythonProcessorBridgeTest {
+
+ @Mock
+ private PythonController controller;
+
+ @Mock
+ private ProcessorCreationWorkflow creationWorkflow;
+
+ @TempDir
+ private Path tempDir;
+
+ private StandardPythonProcessorBridge bridge;
+
+ @BeforeEach
+ void setUp() throws IOException {
+ MockitoAnnotations.openMocks(this);
+
+ // Create a temporary module file
+ final File moduleFile =
Files.createFile(tempDir.resolve("test_processor.py")).toFile();
+ final File workDir =
Files.createDirectory(tempDir.resolve("work")).toFile();
+
+ bridge = new StandardPythonProcessorBridge.Builder()
+ .controller(controller)
+ .creationWorkflow(creationWorkflow)
+ .processorType("TestProcessor")
+ .processorVersion("1.0.0")
+ .workingDirectory(workDir)
+ .moduleFile(moduleFile)
+ .build();
+ }
+
+ @Test
+ void testCancelSetsFlag() {
+ assertFalse(bridge.isCanceled(), "Bridge should not be canceled
initially");
+
+ bridge.cancel();
+
+ assertTrue(bridge.isCanceled(), "Bridge should be canceled after
cancel() is called");
+ }
+
+ @Test
+ void testCancelSetsLoadStateToCanceled() {
+ // Initially should be in some non-canceled state
+ LoadState initialState = bridge.getLoadState();
+ assertFalse(initialState == LoadState.CANCELED, "Initial state should
not be CANCELED");
+
+ bridge.cancel();
+
+ assertEquals(LoadState.CANCELED, bridge.getLoadState(),
+ "Load state should be CANCELED after cancel() is called");
+ }
+
+ @Test
+ void testCancelIsIdempotent() {
+ bridge.cancel();
+ assertTrue(bridge.isCanceled());
+ assertEquals(LoadState.CANCELED, bridge.getLoadState());
+
+ // Call cancel again - should not throw or change state
+ bridge.cancel();
+ assertTrue(bridge.isCanceled());
+ assertEquals(LoadState.CANCELED, bridge.getLoadState());
+
+ // And again
+ bridge.cancel();
+ assertTrue(bridge.isCanceled());
+ assertEquals(LoadState.CANCELED, bridge.getLoadState());
+ }
+
+ @Test
+ void testIsCanceledReturnsFalseInitially() {
+ assertFalse(bridge.isCanceled(), "isCanceled() should return false
before cancel() is called");
+ }
+
+ @Test
+ void testIsCanceledReturnsTrueAfterCancel() {
+ bridge.cancel();
+ assertTrue(bridge.isCanceled(), "isCanceled() should return true after
cancel() is called");
+ }
+
+ @Test
+ @Timeout(value = 5, unit = TimeUnit.SECONDS)
+ void testCancelFromDifferentThread() throws InterruptedException {
+ final CountDownLatch cancelLatch = new CountDownLatch(1);
+ final AtomicBoolean canceledFromThread = new AtomicBoolean(false);
+
+ Thread cancelThread = new Thread(() -> {
+ bridge.cancel();
+ canceledFromThread.set(bridge.isCanceled());
+ cancelLatch.countDown();
+ });
+
+ cancelThread.start();
+ assertTrue(cancelLatch.await(5, TimeUnit.SECONDS), "Cancel should
complete within timeout");
+ assertTrue(canceledFromThread.get(), "Bridge should be canceled from
another thread");
+ assertTrue(bridge.isCanceled(), "Bridge should be canceled as seen
from main thread");
+ }
+
+ @Test
+ void testLoadStateTransitionsToCanceledOnCancel() {
+ // Get initial state
+ LoadState beforeCancel = bridge.getLoadState();
+
+ // Cancel
+ bridge.cancel();
+
+ // Verify state changed to CANCELED
Review Comment:
Comments should be removed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]