This is an automated email from the ASF dual-hosted git repository. turcsanyi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new 265b132e73 NIFI-11493: Defaulted dynamically modified classpath fix 265b132e73 is described below commit 265b132e73c01c956ec679bbbd339ce4649cdb54 Author: Lehel Boér <lehe...@hotmail.com> AuthorDate: Thu Apr 27 21:43:12 2023 +0200 NIFI-11493: Defaulted dynamically modified classpath fix This closes #7201. Co-authored-by: Peter Turcsanyi <turcsa...@apache.org> Signed-off-by: Peter Turcsanyi <turcsa...@apache.org> --- .../nifi/controller/AbstractComponentNode.java | 38 +++++--- .../DefaultedDynamicallyModifyClasspath.java | 104 +++++++++++++++++++++ .../services/org.apache.nifi.processor.Processor | 1 + .../DefaultedDynamicClassPathModificationIT.java | 94 +++++++++++++++++++ 4 files changed, 222 insertions(+), 15 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java index 07200140a8..94d87ad6f9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java @@ -604,6 +604,14 @@ public abstract class AbstractComponentNode implements ComponentNode { return getProperty(property).getEffectiveValue(getParameterContext()); } + private String getEffectivePropertyValueWithDefault(final PropertyDescriptor property) { + String value = getProperty(property).getEffectiveValue(getParameterContext()); + if (value == null) { + value = property.getDefaultValue(); + } + return value; + } + @Override public String getRawPropertyValue(final PropertyDescriptor property) { return getProperty(property).getRawValue(); @@ -662,23 +670,23 @@ public abstract class AbstractComponentNode implements ComponentNode { */ @Override public synchronized void reloadAdditionalResourcesIfNecessary() { - // Components that don't have any PropertyDescriptors marked `dynamicallyModifiesClasspath` - // won't have the fingerprint i.e. will be null, in such cases do nothing - if (additionalResourcesFingerprint == null) { - return; - } - final Set<PropertyDescriptor> descriptors = this.getProperties().keySet(); - final Set<URL> additionalUrls = this.getAdditionalClasspathResources(descriptors); - final String newFingerprint = ClassLoaderUtils.generateAdditionalUrlsFingerprint(additionalUrls, determineClasloaderIsolationKey()); - if(!StringUtils.equals(additionalResourcesFingerprint, newFingerprint)) { - setAdditionalResourcesFingerprint(newFingerprint); - try { - logger.info("Updating classpath for " + this.componentType + " with the ID " + this.getIdentifier()); - reload(additionalUrls); - } catch (Exception e) { - logger.error("Error reloading component with id " + id + ": " + e.getMessage(), e); + final boolean dynamicallyModifiesClasspath = descriptors.stream() + .anyMatch(PropertyDescriptor::isDynamicClasspathModifier); + + if (dynamicallyModifiesClasspath) { + final Set<URL> additionalUrls = this.getAdditionalClasspathResources(descriptors, this::getEffectivePropertyValueWithDefault); + + final String newFingerprint = ClassLoaderUtils.generateAdditionalUrlsFingerprint(additionalUrls, determineClasloaderIsolationKey()); + if (!StringUtils.equals(additionalResourcesFingerprint, newFingerprint)) { + setAdditionalResourcesFingerprint(newFingerprint); + try { + logger.info("Updating classpath for [{}] with the ID [{}]", this.componentType, this.getIdentifier()); + reload(additionalUrls); + } catch (Exception e) { + logger.error("Error reloading component with id [{}]: {}", id, e.getMessage(), e); + } } } } diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/DefaultedDynamicallyModifyClasspath.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/DefaultedDynamicallyModifyClasspath.java new file mode 100644 index 0000000000..673c3c34ed --- /dev/null +++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/DefaultedDynamicallyModifyClasspath.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.tests.system; + +import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.resource.ResourceCardinality; +import org.apache.nifi.components.resource.ResourceType; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.BufferedWriter; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +@RequiresInstanceClassLoading +public class DefaultedDynamicallyModifyClasspath extends AbstractProcessor { + + static final PropertyDescriptor URLS = new PropertyDescriptor.Builder() + .name("URLs to Load") + .description("URLs to load onto the classpath") + .required(false) + .defaultValue("lib/bootstrap/commons-lang3-3.12.0.jar") + .dynamicallyModifiesClasspath(true) + .identifiesExternalResource(ResourceCardinality.MULTIPLE, ResourceType.URL, ResourceType.FILE, ResourceType.DIRECTORY) + .build(); + + static final PropertyDescriptor CLASS_TO_LOAD = new PropertyDescriptor.Builder() + .name("Class to Load") + .description("The name of the Class to load") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("FlowFiles are routed to this relationship if the specified class can be loaded") + .build(); + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("FlowFiles are routed to this relationship if the specified class cannot be loaded") + .build(); + + + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return Arrays.asList(URLS, CLASS_TO_LOAD); + } + + @Override + public Set<Relationship> getRelationships() { + return new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)); + } + + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final String classToLoad = context.getProperty(CLASS_TO_LOAD).getValue(); + try { + final Class<?> clazz = Class.forName(classToLoad); + try (final OutputStream out = session.write(flowFile); + final OutputStreamWriter streamWriter = new OutputStreamWriter(out); + final BufferedWriter writer = new BufferedWriter(streamWriter)) { + + writer.write(clazz.getName()); + writer.newLine(); + writer.write(clazz.getClassLoader().toString()); + } + + session.transfer(flowFile, REL_SUCCESS); + } catch (final Exception e) { + session.transfer(flowFile, REL_FAILURE); + } + } +} diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index a7919d8331..1f278c32ac 100644 --- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -21,6 +21,7 @@ org.apache.nifi.processors.tests.system.DependOnProperties org.apache.nifi.processors.tests.system.DoNotTransferFlowFile org.apache.nifi.processors.tests.system.Duplicate org.apache.nifi.processors.tests.system.DynamicallyModifyClasspath +org.apache.nifi.processors.tests.system.DefaultedDynamicallyModifyClasspath org.apache.nifi.processors.tests.system.EnsureProcessorConfigurationCorrect org.apache.nifi.processors.tests.system.EvaluatePropertiesWithDifferentELScopes org.apache.nifi.processors.tests.system.FakeProcessor diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/DefaultedDynamicClassPathModificationIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/DefaultedDynamicClassPathModificationIT.java new file mode 100644 index 0000000000..5d0c7dfeaf --- /dev/null +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/DefaultedDynamicClassPathModificationIT.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.tests.system.processor; + +import org.apache.nifi.tests.system.NiFiSystemIT; +import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException; +import org.apache.nifi.web.api.entity.ConnectionEntity; +import org.apache.nifi.web.api.entity.ProcessorEntity; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +class DefaultedDynamicClassPathModificationIT extends NiFiSystemIT { + + private ProcessorEntity generateFlowFileProcessor; + private ProcessorEntity defaultedModifyClasspathProcessor; + + private ConnectionEntity defaultedModifyClasspathInputConnection; + private ConnectionEntity successConnection; + private ConnectionEntity failureConnection; + + @Test + void testLoadsClassFromDefaultedDynamicModification() throws NiFiClientException, IOException, InterruptedException { + createFlow(); + + // Update modify to have the appropriate URL, don't update URL to load to let it on default value + final Map<String, String> propertyMap = new HashMap<>(); + propertyMap.put("Class to Load", "org.apache.commons.lang3.StringUtils"); + getClientUtil().updateProcessorProperties(defaultedModifyClasspathProcessor, propertyMap); + getClientUtil().waitForValidProcessor(defaultedModifyClasspathProcessor.getId()); + + // Create a FlowFile + getClientUtil().waitForValidProcessor(generateFlowFileProcessor.getId()); + getClientUtil().startProcessor(generateFlowFileProcessor); + waitForQueueCount(defaultedModifyClasspathInputConnection.getId(), 1); + + // Wait for a FlowFile to be routed to success + getClientUtil().startProcessor(defaultedModifyClasspathProcessor); + waitForQueueCount(successConnection.getId(), 1); + + getClientUtil().stopProcessor(generateFlowFileProcessor); + getClientUtil().waitForStoppedProcessor(generateFlowFileProcessor.getId()); + + // Restart and ensure that everything works as expected after restart + getNiFiInstance().stop(); + getNiFiInstance().start(true); + + // Feed another FlowFile through. Upon restart, in order to modify, we need to get the most up-to-date revision so will first fetch the Processor + final ProcessorEntity generateAfterRestart = getNifiClient().getProcessorClient().getProcessor(generateFlowFileProcessor.getId()); + getClientUtil().waitForValidProcessor(generateAfterRestart.getId()); + getClientUtil().startProcessor(generateAfterRestart); + + // Depending on whether or not the flow was written out with the processor running, the Modify processor may or may not be running. Ensure that it is running. + getClientUtil().waitForValidationCompleted(defaultedModifyClasspathProcessor); + final ProcessorEntity modifyAfterRestart = getNifiClient().getProcessorClient().getProcessor(defaultedModifyClasspathProcessor.getId()); + final String modifyRunStatus = modifyAfterRestart.getStatus().getRunStatus(); + if (!"Running".equalsIgnoreCase(modifyRunStatus)) { + getClientUtil().startProcessor(modifyAfterRestart); + } + + // We now expect 2 FlowFiles to be in the success route + waitForQueueCount(successConnection.getId(), 2); + } + + // We have several tests running the same flow but with different configuration. Since we need to reference the ProcessorEntities and ConnectionEntities, we have a method + // that creates the flow and stores the entities are member variables + private void createFlow() throws NiFiClientException, IOException { + generateFlowFileProcessor = getClientUtil().createProcessor("GenerateFlowFile"); + defaultedModifyClasspathProcessor = getClientUtil().createProcessor("DefaultedDynamicallyModifyClasspath"); + ProcessorEntity terminateSuccess = getClientUtil().createProcessor("TerminateFlowFile"); + ProcessorEntity terminateFailure = getClientUtil().createProcessor("TerminateFlowFile"); + + defaultedModifyClasspathInputConnection = getClientUtil().createConnection(generateFlowFileProcessor, defaultedModifyClasspathProcessor, "success"); + successConnection = getClientUtil().createConnection(defaultedModifyClasspathProcessor, terminateSuccess, "success"); + failureConnection = getClientUtil().createConnection(defaultedModifyClasspathProcessor, terminateFailure, "failure"); + } +}