This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new ab20a93a90 NIFI-11493: Defaulted dynamically modified classpath fix
ab20a93a90 is described below

commit ab20a93a90ab571df1d033db0d77e166e6ea10af
Author: Lehel Boér <lehe...@hotmail.com>
AuthorDate: Thu Apr 27 21:43:12 2023 +0200

    NIFI-11493: Defaulted dynamically modified classpath fix
    
    This closes #7201.
    
    Co-authored-by: Peter Turcsanyi <turcsa...@apache.org>
    Signed-off-by: Peter Turcsanyi <turcsa...@apache.org>
---
 .../nifi/controller/AbstractComponentNode.java     |  38 +++++---
 .../DefaultedDynamicallyModifyClasspath.java       | 104 +++++++++++++++++++++
 .../services/org.apache.nifi.processor.Processor   |   1 +
 .../DefaultedDynamicClassPathModificationIT.java   |  94 +++++++++++++++++++
 4 files changed, 222 insertions(+), 15 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
index 5935bf7cc9..59dfa865d5 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
@@ -604,6 +604,14 @@ public abstract class AbstractComponentNode implements 
ComponentNode {
         return getProperty(property).getEffectiveValue(getParameterContext());
     }
 
+    private String getEffectivePropertyValueWithDefault(final 
PropertyDescriptor property) {
+        String value = 
getProperty(property).getEffectiveValue(getParameterContext());
+        if (value == null) {
+            value = property.getDefaultValue();
+        }
+        return value;
+    }
+
     @Override
     public String getRawPropertyValue(final PropertyDescriptor property) {
         return getProperty(property).getRawValue();
@@ -662,23 +670,23 @@ public abstract class AbstractComponentNode implements 
ComponentNode {
      */
     @Override
     public synchronized void reloadAdditionalResourcesIfNecessary() {
-        // Components that don't have any PropertyDescriptors marked 
`dynamicallyModifiesClasspath`
-        // won't have the fingerprint i.e. will be null, in such cases do 
nothing
-        if (additionalResourcesFingerprint == null) {
-            return;
-        }
-
         final Set<PropertyDescriptor> descriptors = 
this.getProperties().keySet();
-        final Set<URL> additionalUrls = 
this.getAdditionalClasspathResources(descriptors);
 
-        final String newFingerprint = 
ClassLoaderUtils.generateAdditionalUrlsFingerprint(additionalUrls, 
determineClasloaderIsolationKey());
-        if(!StringUtils.equals(additionalResourcesFingerprint, 
newFingerprint)) {
-            setAdditionalResourcesFingerprint(newFingerprint);
-            try {
-                logger.info("Updating classpath for " + this.componentType + " 
with the ID " + this.getIdentifier());
-                reload(additionalUrls);
-            } catch (Exception e) {
-                logger.error("Error reloading component with id " + id + ": " 
+ e.getMessage(), e);
+        final boolean dynamicallyModifiesClasspath = descriptors.stream()
+                .anyMatch(PropertyDescriptor::isDynamicClasspathModifier);
+
+        if (dynamicallyModifiesClasspath) {
+            final Set<URL> additionalUrls = 
this.getAdditionalClasspathResources(descriptors, 
this::getEffectivePropertyValueWithDefault);
+
+            final String newFingerprint = 
ClassLoaderUtils.generateAdditionalUrlsFingerprint(additionalUrls, 
determineClasloaderIsolationKey());
+            if (!StringUtils.equals(additionalResourcesFingerprint, 
newFingerprint)) {
+                setAdditionalResourcesFingerprint(newFingerprint);
+                try {
+                    logger.info("Updating classpath for [{}] with the ID 
[{}]", this.componentType, this.getIdentifier());
+                    reload(additionalUrls);
+                } catch (Exception e) {
+                    logger.error("Error reloading component with id [{}]: {}", 
id, e.getMessage(), e);
+                }
             }
         }
     }
diff --git 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/DefaultedDynamicallyModifyClasspath.java
 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/DefaultedDynamicallyModifyClasspath.java
new file mode 100644
index 0000000000..673c3c34ed
--- /dev/null
+++ 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/DefaultedDynamicallyModifyClasspath.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.tests.system;
+
+import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.resource.ResourceCardinality;
+import org.apache.nifi.components.resource.ResourceType;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.BufferedWriter;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@RequiresInstanceClassLoading
+public class DefaultedDynamicallyModifyClasspath extends AbstractProcessor {
+
+    static final PropertyDescriptor URLS = new PropertyDescriptor.Builder()
+            .name("URLs to Load")
+            .description("URLs to load onto the classpath")
+            .required(false)
+            .defaultValue("lib/bootstrap/commons-lang3-3.12.0.jar")
+            .dynamicallyModifiesClasspath(true)
+            .identifiesExternalResource(ResourceCardinality.MULTIPLE, 
ResourceType.URL, ResourceType.FILE, ResourceType.DIRECTORY)
+            .build();
+
+    static final PropertyDescriptor CLASS_TO_LOAD = new 
PropertyDescriptor.Builder()
+            .name("Class to Load")
+            .description("The name of the Class to load")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("FlowFiles are routed to this relationship if the 
specified class can be loaded")
+            .build();
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("FlowFiles are routed to this relationship if the 
specified class cannot be loaded")
+            .build();
+
+
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return Arrays.asList(URLS, CLASS_TO_LOAD);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE));
+    }
+
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final String classToLoad = 
context.getProperty(CLASS_TO_LOAD).getValue();
+        try {
+            final Class<?> clazz = Class.forName(classToLoad);
+            try (final OutputStream out = session.write(flowFile);
+                 final OutputStreamWriter streamWriter = new 
OutputStreamWriter(out);
+                 final BufferedWriter writer = new 
BufferedWriter(streamWriter)) {
+
+                writer.write(clazz.getName());
+                writer.newLine();
+                writer.write(clazz.getClassLoader().toString());
+            }
+
+            session.transfer(flowFile, REL_SUCCESS);
+        } catch (final Exception e) {
+            session.transfer(flowFile, REL_FAILURE);
+        }
+    }
+}
diff --git 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index a7919d8331..1f278c32ac 100644
--- 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -21,6 +21,7 @@ org.apache.nifi.processors.tests.system.DependOnProperties
 org.apache.nifi.processors.tests.system.DoNotTransferFlowFile
 org.apache.nifi.processors.tests.system.Duplicate
 org.apache.nifi.processors.tests.system.DynamicallyModifyClasspath
+org.apache.nifi.processors.tests.system.DefaultedDynamicallyModifyClasspath
 org.apache.nifi.processors.tests.system.EnsureProcessorConfigurationCorrect
 org.apache.nifi.processors.tests.system.EvaluatePropertiesWithDifferentELScopes
 org.apache.nifi.processors.tests.system.FakeProcessor
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/DefaultedDynamicClassPathModificationIT.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/DefaultedDynamicClassPathModificationIT.java
new file mode 100644
index 0000000000..5d0c7dfeaf
--- /dev/null
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/DefaultedDynamicClassPathModificationIT.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.tests.system.processor;
+
+import org.apache.nifi.tests.system.NiFiSystemIT;
+import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
+import org.apache.nifi.web.api.entity.ConnectionEntity;
+import org.apache.nifi.web.api.entity.ProcessorEntity;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+class DefaultedDynamicClassPathModificationIT extends NiFiSystemIT {
+
+    private ProcessorEntity generateFlowFileProcessor;
+    private ProcessorEntity defaultedModifyClasspathProcessor;
+
+    private ConnectionEntity defaultedModifyClasspathInputConnection;
+    private ConnectionEntity successConnection;
+    private ConnectionEntity failureConnection;
+
+    @Test
+    void testLoadsClassFromDefaultedDynamicModification() throws 
NiFiClientException, IOException, InterruptedException {
+        createFlow();
+
+        // Update modify to have the appropriate URL, don't update URL to load 
to let it on default value
+        final Map<String, String> propertyMap = new HashMap<>();
+        propertyMap.put("Class to Load", 
"org.apache.commons.lang3.StringUtils");
+        
getClientUtil().updateProcessorProperties(defaultedModifyClasspathProcessor, 
propertyMap);
+        
getClientUtil().waitForValidProcessor(defaultedModifyClasspathProcessor.getId());
+
+        // Create a FlowFile
+        
getClientUtil().waitForValidProcessor(generateFlowFileProcessor.getId());
+        getClientUtil().startProcessor(generateFlowFileProcessor);
+        waitForQueueCount(defaultedModifyClasspathInputConnection.getId(), 1);
+
+        // Wait for a FlowFile to be routed to success
+        getClientUtil().startProcessor(defaultedModifyClasspathProcessor);
+        waitForQueueCount(successConnection.getId(), 1);
+
+        getClientUtil().stopProcessor(generateFlowFileProcessor);
+        
getClientUtil().waitForStoppedProcessor(generateFlowFileProcessor.getId());
+
+        // Restart and ensure that everything works as expected after restart
+        getNiFiInstance().stop();
+        getNiFiInstance().start(true);
+
+        // Feed another FlowFile through. Upon restart, in order to modify, we 
need to get the most up-to-date revision so will first fetch the Processor
+        final ProcessorEntity generateAfterRestart = 
getNifiClient().getProcessorClient().getProcessor(generateFlowFileProcessor.getId());
+        getClientUtil().waitForValidProcessor(generateAfterRestart.getId());
+        getClientUtil().startProcessor(generateAfterRestart);
+
+        // Depending on whether or not the flow was written out with the 
processor running, the Modify processor may or may not be running. Ensure that 
it is running.
+        
getClientUtil().waitForValidationCompleted(defaultedModifyClasspathProcessor);
+        final ProcessorEntity modifyAfterRestart = 
getNifiClient().getProcessorClient().getProcessor(defaultedModifyClasspathProcessor.getId());
+        final String modifyRunStatus = 
modifyAfterRestart.getStatus().getRunStatus();
+        if (!"Running".equalsIgnoreCase(modifyRunStatus)) {
+            getClientUtil().startProcessor(modifyAfterRestart);
+        }
+
+        // We now expect 2 FlowFiles to be in the success route
+        waitForQueueCount(successConnection.getId(), 2);
+    }
+
+    // We have several tests running the same flow but with different 
configuration. Since we need to reference the ProcessorEntities and 
ConnectionEntities, we have a method
+    // that creates the flow and stores the entities are member variables
+    private void createFlow() throws NiFiClientException, IOException {
+        generateFlowFileProcessor = 
getClientUtil().createProcessor("GenerateFlowFile");
+        defaultedModifyClasspathProcessor = 
getClientUtil().createProcessor("DefaultedDynamicallyModifyClasspath");
+        ProcessorEntity terminateSuccess = 
getClientUtil().createProcessor("TerminateFlowFile");
+        ProcessorEntity terminateFailure = 
getClientUtil().createProcessor("TerminateFlowFile");
+
+        defaultedModifyClasspathInputConnection = 
getClientUtil().createConnection(generateFlowFileProcessor, 
defaultedModifyClasspathProcessor, "success");
+        successConnection = 
getClientUtil().createConnection(defaultedModifyClasspathProcessor, 
terminateSuccess, "success");
+        failureConnection = 
getClientUtil().createConnection(defaultedModifyClasspathProcessor, 
terminateFailure, "failure");
+    }
+}

Reply via email to