This is an automated email from the ASF dual-hosted git repository.

mthomsen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new dfbf2e3  NIFI-9286: JOLT Expression Language Fixes NIFI-6213 and adds 
in functionality to use expression language in class and module specification 
NIFI-9286: adding JOLT unit tests NIFI-9286: addressing PR feedback Fixes a 
problem with the scope of the EL for module directory NIFI-9286: alignment of 
JOLT processors NIFI-9286: fix checkstyle
dfbf2e3 is described below

commit dfbf2e3cea5485196c18934d0c01f49103f12727
Author: levilentz <levile...@gmail.com>
AuthorDate: Tue Oct 5 14:46:13 2021 -0700

    NIFI-9286: JOLT Expression Language
    Fixes NIFI-6213 and adds in functionality to use expression language in 
class and module specification
    NIFI-9286: adding JOLT unit tests
    NIFI-9286: addressing PR feedback
    Fixes a problem with the scope of the EL for module directory
    NIFI-9286: alignment of JOLT processors
    NIFI-9286: fix checkstyle
    
    This closes #5444
    
    Signed-off-by: Mike Thomsen <mthom...@apache.org>
---
 .../jolt/record/JoltTransformRecord.java           | 47 ++++++++++++-------
 .../jolt/record/TestJoltTransformRecord.java       | 28 +++++++++++
 .../processors/standard/JoltTransformJSON.java     | 54 +++++++++++++++-------
 .../processors/standard/TestJoltTransformJSON.java | 27 +++++++++++
 4 files changed, 123 insertions(+), 33 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/java/org/apache/nifi/processors/jolt/record/JoltTransformRecord.java
 
b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/java/org/apache/nifi/processors/jolt/record/JoltTransformRecord.java
index 0b48901..9293e4c 100644
--- 
a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/java/org/apache/nifi/processors/jolt/record/JoltTransformRecord.java
+++ 
b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/java/org/apache/nifi/processors/jolt/record/JoltTransformRecord.java
@@ -20,14 +20,15 @@ import com.bazaarvoice.jolt.ContextualTransform;
 import com.bazaarvoice.jolt.JoltTransform;
 import com.bazaarvoice.jolt.JsonUtils;
 import com.bazaarvoice.jolt.Transform;
+import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
-import com.github.benmanes.caffeine.cache.LoadingCache;
 import org.apache.nifi.annotation.behavior.EventDriven;
-import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.SideEffectFree;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
-import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
@@ -60,7 +61,6 @@ import 
org.apache.nifi.serialization.record.util.DataTypeUtils;
 import org.apache.nifi.util.StopWatch;
 import org.apache.nifi.util.StringUtils;
 
-import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -89,6 +89,7 @@ import java.util.stream.Collectors;
 @CapabilityDescription("Applies a list of Jolt specifications to the FlowFile 
payload. A new FlowFile is created "
         + "with transformed content and is routed to the 'success' 
relationship. If the transform "
         + "fails, the original FlowFile is routed to the 'failure' 
relationship.")
+@RequiresInstanceClassLoading
 public class JoltTransformRecord extends AbstractProcessor {
 
     static final AllowableValue SHIFTR
@@ -151,8 +152,9 @@ public class JoltTransformRecord extends AbstractProcessor {
             .displayName("Custom Transformation Class Name")
             .description("Fully Qualified Class Name for Custom 
Transformation")
             .required(false)
-            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .dependsOn(JOLT_SPEC, CUSTOMR)
             .build();
 
     static final PropertyDescriptor MODULES = new PropertyDescriptor.Builder()
@@ -163,6 +165,7 @@ public class JoltTransformRecord extends AbstractProcessor {
             
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .identifiesExternalResource(ResourceCardinality.MULTIPLE, 
ResourceType.FILE, ResourceType.DIRECTORY)
             .dynamicallyModifiesClasspath(true)
+            .dependsOn(JOLT_SPEC, CUSTOMR)
             .build();
 
     static final PropertyDescriptor TRANSFORM_CACHE_SIZE = new 
PropertyDescriptor.Builder()
@@ -200,7 +203,7 @@ public class JoltTransformRecord extends AbstractProcessor {
      * For some cases the key could be empty. It means that it represents 
default transform (e.g. for custom transform
      * when there is no jolt-record-spec specified).
      */
-    private LoadingCache<Optional<String>, JoltTransform> transformCache;
+    private Cache<Optional<String>, JoltTransform> transformCache;
 
     static {
         final List<PropertyDescriptor> _properties = new ArrayList<>();
@@ -235,7 +238,6 @@ public class JoltTransformRecord extends AbstractProcessor {
         final List<ValidationResult> results = new 
ArrayList<>(super.customValidate(validationContext));
         final String transform = 
validationContext.getProperty(JOLT_TRANSFORM).getValue();
         final String customTransform = 
validationContext.getProperty(CUSTOM_CLASS).getValue();
-
         if (!validationContext.getProperty(JOLT_SPEC).isSet() || 
StringUtils.isEmpty(validationContext.getProperty(JOLT_SPEC).getValue())) {
             if (!SORTR.getValue().equals(transform)) {
                 final String message = "A specification is required for this 
transformation";
@@ -247,7 +249,7 @@ public class JoltTransformRecord extends AbstractProcessor {
             try {
                 final String specValue = 
validationContext.getProperty(JOLT_SPEC).getValue();
 
-                if (validationContext.isExpressionLanguagePresent(specValue)) {
+                if (validationContext.isExpressionLanguagePresent(specValue) ) 
{
                     final String invalidExpressionMsg = 
validationContext.newExpressionLanguageCompiler().validateExpression(specValue, 
true);
                     if (!StringUtils.isEmpty(invalidExpressionMsg)) {
                         results.add(new ValidationResult.Builder().valid(false)
@@ -265,6 +267,14 @@ public class JoltTransformRecord extends AbstractProcessor 
{
                             results.add(new 
ValidationResult.Builder().valid(false)
                                     .explanation(customMessage)
                                     .build());
+                        } else if 
(validationContext.isExpressionLanguagePresent(customTransform)) {
+                            final String invalidExpressionMsg = 
validationContext.newExpressionLanguageCompiler().validateExpression(customTransform,
 true);
+                            if (!StringUtils.isEmpty(invalidExpressionMsg)) {
+                                results.add(new 
ValidationResult.Builder().valid(false)
+                                        .subject(CUSTOM_CLASS.getDisplayName())
+                                        .explanation("Invalid Expression 
Language: " + invalidExpressionMsg)
+                                        .build());
+                            }
                         } else {
                             
TransformFactory.getCustomTransform(Thread.currentThread().getContextClassLoader(),
 customTransform, specJson);
                         }
@@ -273,7 +283,7 @@ public class JoltTransformRecord extends AbstractProcessor {
                     }
                 }
             } catch (final Exception e) {
-                getLogger().info("Processor is not valid - " + e.toString());
+                getLogger().info("Processor is not valid - ", e);
                 String message = "Specification not valid for the selected 
transformation.";
                 results.add(new ValidationResult.Builder().valid(false)
                         .explanation(message)
@@ -443,7 +453,14 @@ public class JoltTransformRecord extends AbstractProcessor 
{
             specString = Optional.empty();
         }
 
-        return transformCache.get(specString);
+        return transformCache.get(specString, currString -> {
+            try {
+                return createTransform(context, currString.orElse(null), 
flowFile);
+            } catch (Exception e) {
+                getLogger().error("Problem getting transform", e);
+            }
+            return null;
+        });
     }
 
     @OnScheduled
@@ -451,10 +468,10 @@ public class JoltTransformRecord extends 
AbstractProcessor {
         int maxTransformsToCache = 
context.getProperty(TRANSFORM_CACHE_SIZE).asInteger();
         transformCache = Caffeine.newBuilder()
                 .maximumSize(maxTransformsToCache)
-                .build(specString -> createTransform(context, 
specString.orElse(null)));
+                .build();
     }
 
-    private JoltTransform createTransform(final ProcessContext context, final 
String specString) throws Exception {
+    private JoltTransform createTransform(final ProcessContext context, final 
String specString, final FlowFile flowFile) throws Exception {
         final Object specJson;
         if (context.getProperty(JOLT_SPEC).isSet() && 
!SORTR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())) {
             specJson = JsonUtils.jsonToObject(specString, DEFAULT_CHARSET);
@@ -463,16 +480,12 @@ public class JoltTransformRecord extends 
AbstractProcessor {
         }
 
         if 
(CUSTOMR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())) {
-            return 
TransformFactory.getCustomTransform(Thread.currentThread().getContextClassLoader(),
 context.getProperty(CUSTOM_CLASS).getValue(), specJson);
+            return 
TransformFactory.getCustomTransform(Thread.currentThread().getContextClassLoader(),
 
context.getProperty(CUSTOM_CLASS).evaluateAttributeExpressions(flowFile).getValue(),
 specJson);
         } else {
             return 
TransformFactory.getTransform(Thread.currentThread().getContextClassLoader(), 
context.getProperty(JOLT_TRANSFORM).getValue(), specJson);
         }
     }
 
-    protected FilenameFilter getJarFilenameFilter() {
-        return (dir, name) -> (name != null && name.endsWith(".jar"));
-    }
-
     protected static Object transform(JoltTransform joltTransform, Object 
input) {
         return joltTransform instanceof ContextualTransform
                 ? ((ContextualTransform) joltTransform).transform(input, 
Collections.emptyMap()) : ((Transform) joltTransform).transform(input);
diff --git 
a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/java/org/apache/nifi/processors/jolt/record/TestJoltTransformRecord.java
 
b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/java/org/apache/nifi/processors/jolt/record/TestJoltTransformRecord.java
index 6c4188b..288efe0 100644
--- 
a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/java/org/apache/nifi/processors/jolt/record/TestJoltTransformRecord.java
+++ 
b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/java/org/apache/nifi/processors/jolt/record/TestJoltTransformRecord.java
@@ -39,6 +39,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.net.URL;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.Arrays;
@@ -583,6 +584,33 @@ 
runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
     }
 
     @Test
+    public void testExpressionLanguageJarFile() throws IOException {
+        generateTestData(1, null);
+        final String outputSchemaText = new 
String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrOutputSchema.avsc")));
+        runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, 
SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+        runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, 
outputSchemaText);
+        runner.setProperty(writer, "Pretty Print JSON", "true");
+        runner.enableControllerService(writer);
+        URL t = 
getClass().getResource("/TestJoltTransformRecord/TestCustomJoltTransform.jar");
+        assert t != null;
+        final String customJarPath = t.getPath();
+        final String spec = new 
String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/customChainrSpec.json")));
+        final String customJoltTransform = "TestCustomJoltTransform";
+        final String customClass = "TestCustomJoltTransform";
+        runner.setProperty(JoltTransformRecord.JOLT_SPEC, "${JOLT_SPEC}");
+        runner.setProperty(JoltTransformRecord.MODULES, customJarPath);
+        runner.setProperty(JoltTransformRecord.CUSTOM_CLASS, 
"${CUSTOM_CLASS}");
+        runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, 
JoltTransformRecord.CUSTOMR);
+        runner.setVariable("CUSTOM_JAR", customJarPath);
+        Map<String, String> customSpecs = new HashMap<>();
+        customSpecs.put("JOLT_SPEC", spec);
+        customSpecs.put("CUSTOM_JOLT_CLASS", customJoltTransform);
+        customSpecs.put("CUSTOM_CLASS", customClass);
+        runner.enqueue(new byte[0], customSpecs);
+        runner.assertValid();
+    }
+
+    @Test
     public void testJoltSpecEL() throws IOException {
         generateTestData(1, null);
         final String outputSchemaText = new 
String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrOutputSchema.avsc")));
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoltTransformJSON.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoltTransformJSON.java
index 7c7d649..040a9bd 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoltTransformJSON.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoltTransformJSON.java
@@ -18,13 +18,14 @@ package org.apache.nifi.processors.standard;
 
 import com.bazaarvoice.jolt.JoltTransform;
 import com.bazaarvoice.jolt.JsonUtils;
+import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
-import com.github.benmanes.caffeine.cache.LoadingCache;
 import org.apache.nifi.annotation.behavior.EventDriven;
-import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.SideEffectFree;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
@@ -73,6 +74,7 @@ import java.util.concurrent.TimeUnit;
 @CapabilityDescription("Applies a list of Jolt specifications to the flowfile 
JSON payload. A new FlowFile is created "
         + "with transformed content and is routed to the 'success' 
relationship. If the JSON transform "
         + "fails, the original FlowFile is routed to the 'failure' 
relationship.")
+@RequiresInstanceClassLoading
 public class JoltTransformJSON extends AbstractProcessor {
 
     public static final AllowableValue SHIFTR = new 
AllowableValue("jolt-transform-shift", "Shift", "Shift input JSON/data to 
create the output JSON.");
@@ -109,8 +111,9 @@ public class JoltTransformJSON extends AbstractProcessor {
             .displayName("Custom Transformation Class Name")
             .description("Fully Qualified Class Name for Custom 
Transformation")
             .required(false)
-            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .dependsOn(JOLT_SPEC, CUSTOMR)
             .build();
 
     public static final PropertyDescriptor MODULES = new 
PropertyDescriptor.Builder()
@@ -119,7 +122,9 @@ public class JoltTransformJSON extends AbstractProcessor {
             .description("Comma-separated list of paths to files and/or 
directories which contain modules containing custom transformations (that are 
not included on NiFi's classpath).")
             .required(false)
             .identifiesExternalResource(ResourceCardinality.MULTIPLE, 
ResourceType.FILE, ResourceType.DIRECTORY)
-            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .dynamicallyModifiesClasspath(true)
+            .dependsOn(JOLT_SPEC, CUSTOMR)
             .build();
 
     static final PropertyDescriptor TRANSFORM_CACHE_SIZE = new 
PropertyDescriptor.Builder()
@@ -160,7 +165,7 @@ public class JoltTransformJSON extends AbstractProcessor {
      * For some cases the key could be empty. It means that it represents 
default transform (e.g. for custom transform
      * when there is no jolt-record-spec specified).
      */
-    private LoadingCache<Optional<String>, JoltTransform> transformCache;
+    private Cache<Optional<String>, JoltTransform> transformCache;
 
     static {
         final List<PropertyDescriptor> _properties = new ArrayList<>();
@@ -188,8 +193,6 @@ public class JoltTransformJSON extends AbstractProcessor {
         return properties;
     }
 
-
-
     @Override
     protected Collection<ValidationResult> customValidate(ValidationContext 
validationContext) {
         final List<ValidationResult> results = new 
ArrayList<>(super.customValidate(validationContext));
@@ -208,7 +211,7 @@ public class JoltTransformJSON extends AbstractProcessor {
             final ClassLoader customClassLoader;
 
             try {
-                if (modulePath != null) {
+                if (modulePath != null && 
!validationContext.isExpressionLanguagePresent(modulePath)) {
                     customClassLoader = 
ClassLoaderUtils.getCustomClassLoader(modulePath, 
this.getClass().getClassLoader(), getJarFilenameFilter());
                 } else {
                     customClassLoader =  this.getClass().getClassLoader();
@@ -217,13 +220,21 @@ public class JoltTransformJSON extends AbstractProcessor {
                 final String specValue =  
validationContext.getProperty(JOLT_SPEC).getValue();
 
                 if (validationContext.isExpressionLanguagePresent(specValue)) {
-                    final String invalidExpressionMsg = 
validationContext.newExpressionLanguageCompiler().validateExpression(specValue,true);
+                    final String invalidExpressionMsg = 
validationContext.newExpressionLanguageCompiler().validateExpression(specValue, 
true);
                     if (!StringUtils.isEmpty(invalidExpressionMsg)) {
                         results.add(new ValidationResult.Builder().valid(false)
                                 .subject(JOLT_SPEC.getDisplayName())
                                 .explanation("Invalid Expression Language: " + 
invalidExpressionMsg)
                                 .build());
                     }
+                } else if 
(validationContext.isExpressionLanguagePresent(customTransform)) {
+                    final String invalidExpressionMsg = 
validationContext.newExpressionLanguageCompiler().validateExpression(customTransform,
 true);
+                    if (!StringUtils.isEmpty(invalidExpressionMsg)) {
+                        results.add(new ValidationResult.Builder().valid(false)
+                                .subject(CUSTOM_CLASS.getDisplayName())
+                                .explanation("Invalid Expression Language: " + 
invalidExpressionMsg)
+                                .build());
+                    }
                 } else {
                     //for validation we want to be able to ensure the spec is 
syntactically correct and not try to resolve variables since they may not exist 
yet
                     Object specJson = SORTR.getValue().equals(transform) ? 
null : JsonUtils.jsonToObject(specValue.replaceAll("\\$\\{","\\\\\\\\\\$\\{"), 
DEFAULT_CHARSET);
@@ -242,7 +253,7 @@ public class JoltTransformJSON extends AbstractProcessor {
                     }
                 }
             } catch (final Exception e) {
-                getLogger().info("Processor is not valid - " + e.toString());
+                getLogger().error("processor is not valid: ", e);
                 String message = "Specification not valid for the selected 
transformation." ;
                 results.add(new ValidationResult.Builder().valid(false)
                         .explanation(message)
@@ -306,7 +317,7 @@ public class JoltTransformJSON extends AbstractProcessor {
         logger.info("Transformed {}", new Object[]{original});
     }
 
-    private JoltTransform getTransform(final ProcessContext context, final 
FlowFile flowFile) throws Exception {
+    private JoltTransform getTransform(final ProcessContext context, final 
FlowFile flowFile) {
         final Optional<String> specString;
         if (context.getProperty(JOLT_SPEC).isSet()) {
             specString = 
Optional.of(context.getProperty(JOLT_SPEC).evaluateAttributeExpressions(flowFile).getValue());
@@ -314,7 +325,14 @@ public class JoltTransformJSON extends AbstractProcessor {
             specString = Optional.empty();
         }
 
-        return transformCache.get(specString);
+        return transformCache.get(specString, currString -> {
+            try {
+                return createTransform(context, currString.orElse(null), 
flowFile);
+            } catch (Exception e) {
+                getLogger().error("Problem getting transform", e);
+            }
+            return null;
+        });
     }
 
     @OnScheduled
@@ -322,11 +340,15 @@ public class JoltTransformJSON extends AbstractProcessor {
         int maxTransformsToCache = 
context.getProperty(TRANSFORM_CACHE_SIZE).asInteger();
         transformCache = Caffeine.newBuilder()
                 .maximumSize(maxTransformsToCache)
-                .build(specString -> createTransform(context, 
specString.orElse(null)));
+                .build();
 
         try {
             if (context.getProperty(MODULES).isSet()) {
-                customClassLoader = 
ClassLoaderUtils.getCustomClassLoader(context.getProperty(MODULES).getValue(), 
this.getClass().getClassLoader(), getJarFilenameFilter());
+                customClassLoader = ClassLoaderUtils.getCustomClassLoader(
+                        
context.getProperty(MODULES).evaluateAttributeExpressions().getValue(),
+                        this.getClass().getClassLoader(),
+                        getJarFilenameFilter()
+                );
             } else {
                 customClassLoader = this.getClass().getClassLoader();
             }
@@ -335,7 +357,7 @@ public class JoltTransformJSON extends AbstractProcessor {
         }
     }
 
-    private JoltTransform createTransform(final ProcessContext context, final 
String specString) throws Exception {
+    private JoltTransform createTransform(final ProcessContext context, final 
String specString, final FlowFile flowFile) throws Exception {
         final Object specJson;
         if (context.getProperty(JOLT_SPEC).isSet() && 
!SORTR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())) {
             specJson = JsonUtils.jsonToObject(specString, DEFAULT_CHARSET);
@@ -344,7 +366,7 @@ public class JoltTransformJSON extends AbstractProcessor {
         }
 
         if 
(CUSTOMR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())) {
-            return 
TransformFactory.getCustomTransform(Thread.currentThread().getContextClassLoader(),
 context.getProperty(CUSTOM_CLASS).getValue(), specJson);
+            return 
TransformFactory.getCustomTransform(Thread.currentThread().getContextClassLoader(),
 
context.getProperty(CUSTOM_CLASS).evaluateAttributeExpressions(flowFile).getValue(),
 specJson);
         } else {
             return 
TransformFactory.getTransform(Thread.currentThread().getContextClassLoader(), 
context.getProperty(JOLT_TRANSFORM).getValue(), specJson);
         }
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJoltTransformJSON.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJoltTransformJSON.java
index c548662..7e68c6b 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJoltTransformJSON.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJoltTransformJSON.java
@@ -24,6 +24,7 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
@@ -362,6 +363,32 @@ public class TestJoltTransformJSON {
     }
 
     @Test
+    public void testExpressionLanguageJarFile() throws IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new 
JoltTransformJSON());
+        final String customJarPath = 
"src/test/resources/TestJoltTransformJson/TestCustomJoltTransform.jar";
+        final String spec = new 
String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformJson/chainrSpec.json")));
+        final String customJoltTransform = "TestCustomJoltTransform";
+
+        Map<String, String> customSpecs = new HashMap<>();
+        customSpecs.put("JOLT_SPEC", spec);
+        customSpecs.put("CUSTOM_JOLT_CLASS", customJoltTransform);
+        runner.setProperty(JoltTransformJSON.JOLT_SPEC, "${JOLT_SPEC}");
+        
runner.setProperty(JoltTransformJSON.CUSTOM_CLASS,"${CUSTOM_JOLT_CLASS}");
+        runner.setProperty(JoltTransformJSON.MODULES, "${CUSTOM_JAR}");
+        
runner.setProperty(JoltTransformJSON.JOLT_TRANSFORM,JoltTransformJSON.CUSTOMR);
+        runner.setVariable("CUSTOM_JAR", customJarPath);
+        runner.enqueue(JSON_INPUT, customSpecs);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(JoltTransformJSON.REL_SUCCESS);
+        final MockFlowFile transformed = 
runner.getFlowFilesForRelationship(JoltTransformJSON.REL_SUCCESS).get(0);
+        transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key());
+        
transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(),"application/json");
+        Object transformedJson = JsonUtils.jsonToObject(new 
ByteArrayInputStream(transformed.toByteArray()));
+        Object compareJson = 
JsonUtils.jsonToObject(Files.newInputStream(Paths.get("src/test/resources/TestJoltTransformJson/chainrOutput.json")));
+        assertTrue(DIFFY.diff(compareJson, transformedJson).isEmpty());
+    }
+
+    @Test
     public void testTransformInputWithCustomTransformationWithDir() throws 
IOException {
         final TestRunner runner = TestRunners.newTestRunner(new 
JoltTransformJSON());
         final String customJarPath = 
"src/test/resources/TestJoltTransformJson";

Reply via email to