Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2833#discussion_r210065027 --- Diff: nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/java/org/apache/nifi/processors/jolt/record/JoltTransformRecord.java --- @@ -0,0 +1,534 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.jolt.record; + +import com.bazaarvoice.jolt.ContextualTransform; +import com.bazaarvoice.jolt.JoltTransform; +import com.bazaarvoice.jolt.JsonUtils; +import com.bazaarvoice.jolt.Transform; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.SystemResource; +import org.apache.nifi.annotation.behavior.SystemResourceConsideration; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.jolt.record.util.TransformFactory; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.ListRecordSet; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.nifi.serialization.record.util.DataTypeUtils; +import org.apache.nifi.util.StopWatch; +import org.apache.nifi.util.StringUtils; +import org.apache.nifi.util.file.classloader.ClassLoaderUtils; + +import java.io.FilenameFilter; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags({"record", "jolt", "transform", "shiftr", "chainr", "defaultr", "removr", "cardinality", "sort"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@WritesAttributes({ + @WritesAttribute(attribute = "record.count", description = "The number of records in an outgoing FlowFile"), + @WritesAttribute(attribute = "mime.type", description = "The MIME Type that the configured Record Writer indicates is appropriate"), +}) +@CapabilityDescription("Applies a list of Jolt specifications to the FlowFile payload. A new FlowFile is created " + + "with transformed content and is routed to the 'success' relationship. If the transform " + + "fails, the original FlowFile is routed to the 'failure' relationship.") +@SystemResourceConsideration(resource = SystemResource.MEMORY, description = "If the Jolt transform is applied to the entire record set, memory issues can occur " + + "for large record sets.") +public class JoltTransformRecord extends AbstractProcessor { + + static final AllowableValue SHIFTR + = new AllowableValue("jolt-transform-shift", "Shift", "Shift input data to create the output."); + static final AllowableValue CHAINR + = new AllowableValue("jolt-transform-chain", "Chain", "Execute list of Jolt transformations."); + static final AllowableValue DEFAULTR + = new AllowableValue("jolt-transform-default", "Default", " Apply default values to the output."); + static final AllowableValue REMOVR + = new AllowableValue("jolt-transform-remove", "Remove", " Remove values from input data to create the output."); + static final AllowableValue CARDINALITY + = new AllowableValue("jolt-transform-card", "Cardinality", "Change the cardinality of input elements to create the output."); + static final AllowableValue SORTR + = new AllowableValue("jolt-transform-sort", "Sort", "Sort input field name values alphabetically. Any specification set is ignored."); + static final AllowableValue CUSTOMR + = new AllowableValue("jolt-transform-custom", "Custom", "Custom Transformation. Requires Custom Transformation Class Name"); + static final AllowableValue MODIFIER_DEFAULTR + = new AllowableValue("jolt-transform-modify-default", "Modify - Default", "Writes when field name is missing or value is null"); + static final AllowableValue MODIFIER_OVERWRITER + = new AllowableValue("jolt-transform-modify-overwrite", "Modify - Overwrite", " Always overwrite value"); + static final AllowableValue MODIFIER_DEFINER + = new AllowableValue("jolt-transform-modify-define", "Modify - Define", "Writes when key is missing"); + + static final AllowableValue APPLY_TO_RECORD_SET + = new AllowableValue("jolt-record-apply-recordset", "Entire Record Set", "Applies the transformation to the record set as a whole. Used when " + + "values from multiple records are needed in the transformation."); + static final AllowableValue APPLY_TO_RECORDS + = new AllowableValue("jolt-record-apply-records", "Each Record", "Applies the transformation to each record individually."); + + + static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() + .name("jolt-record-record-reader") + .displayName("Record Reader") + .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.") + .identifiesControllerService(RecordReaderFactory.class) + .required(true) + .build(); + + static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder() + .name("jolt-record-record-writer") + .displayName("Record Writer") + .description("Specifies the Controller Service to use for writing out the records") + .identifiesControllerService(RecordSetWriterFactory.class) + .required(true) + .build(); + + static final PropertyDescriptor JOLT_TRANSFORM = new PropertyDescriptor.Builder() + .name("jolt-record-transform") + .displayName("Jolt Transformation DSL") + .description("Specifies the Jolt Transformation that should be used with the provided specification.") + .required(true) + .allowableValues(CARDINALITY, CHAINR, DEFAULTR, MODIFIER_DEFAULTR, MODIFIER_DEFINER, MODIFIER_OVERWRITER, REMOVR, SHIFTR, SORTR, CUSTOMR) + .defaultValue(CHAINR.getValue()) + .build(); + + static final PropertyDescriptor JOLT_SPEC = new PropertyDescriptor.Builder() + .name("jolt-record-spec") + .displayName("Jolt Specification") + .description("Jolt Specification for transform of record data. This value is ignored if the Jolt Sort Transformation is selected.") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(false) + .build(); + + static final PropertyDescriptor CUSTOM_CLASS = new PropertyDescriptor.Builder() + .name("jolt-record-custom-class") + .displayName("Custom Transformation Class Name") + .description("Fully Qualified Class Name for Custom Transformation") + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + static final PropertyDescriptor MODULES = new PropertyDescriptor.Builder() + .name("jolt-record-custom-modules") + .displayName("Custom Module Directory") + .description("Comma-separated list of paths to files and/or directories which contain modules containing custom transformations (that are not included on NiFi's classpath).") + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + static final PropertyDescriptor TRANSFORM_STRATEGY = new PropertyDescriptor.Builder() + .name("jolt-record-transform-strategy") + .displayName("Transformation Strategy") + .description("Specifies whether the transform should be applied to the entire record set or to each individual record. Note that when the transform is applied to " + + "the entire record set, the first element in the spec should be an asterix (*) in order to match each record.") + .required(true) + .allowableValues(APPLY_TO_RECORD_SET, APPLY_TO_RECORDS) + .defaultValue(APPLY_TO_RECORDS.getValue()) + .build(); + + static final PropertyDescriptor TRANSFORM_CACHE_SIZE = new PropertyDescriptor.Builder() + .name("jolt-record-transform-cache-size") + .displayName("Transform Cache Size") + .description("Compiling a Jolt Transform can be fairly expensive. Ideally, this will be done only once. However, if the Expression Language is used in the transform, we may need " + + "a new Transform for each FlowFile. This value controls how many of those Transforms we cache in memory in order to avoid having to compile the Transform each time.") + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("1") + .required(true) + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("The FlowFile with transformed content will be routed to this relationship") + .build(); + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("If a FlowFile fails processing for any reason (for example, the FlowFile records cannot be parsed), it will be routed to this relationship") + .build(); + + private final static List<PropertyDescriptor> properties; + private final static Set<Relationship> relationships; + private volatile ClassLoader customClassLoader; + private final static String DEFAULT_CHARSET = "UTF-8"; + + // Cache is guarded by synchronizing on 'this'. + private volatile int maxTransformsToCache = 10; + private final Map<String, JoltTransform> transformCache = new LinkedHashMap<String, JoltTransform>() { + @Override + protected boolean removeEldestEntry(Map.Entry<String, JoltTransform> eldest) { + final boolean evict = size() > maxTransformsToCache; + if (evict) { + getLogger().debug("Removing Jolt Transform from cache because cache is full"); + } + return evict; + } + }; + + static { + final List<PropertyDescriptor> _properties = new ArrayList<>(); + _properties.add(RECORD_READER); + _properties.add(RECORD_WRITER); + _properties.add(JOLT_TRANSFORM); + _properties.add(CUSTOM_CLASS); + _properties.add(MODULES); + _properties.add(JOLT_SPEC); + _properties.add(TRANSFORM_STRATEGY); + _properties.add(TRANSFORM_CACHE_SIZE); + properties = Collections.unmodifiableList(_properties); + + final Set<Relationship> _relationships = new HashSet<>(); + _relationships.add(REL_SUCCESS); + _relationships.add(REL_FAILURE); + relationships = Collections.unmodifiableSet(_relationships); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { + final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext)); + final String transform = validationContext.getProperty(JOLT_TRANSFORM).getValue(); + final String customTransform = validationContext.getProperty(CUSTOM_CLASS).getValue(); + final String modulePath = validationContext.getProperty(MODULES).isSet() ? validationContext.getProperty(MODULES).getValue() : null; + + if (!validationContext.getProperty(JOLT_SPEC).isSet() || StringUtils.isEmpty(validationContext.getProperty(JOLT_SPEC).getValue())) { + if (!SORTR.getValue().equals(transform)) { + final String message = "A specification is required for this transformation"; + results.add(new ValidationResult.Builder().valid(false) + .explanation(message) + .build()); + } + } else { + final ClassLoader customClassLoader; + + try { + if (modulePath != null) { --- End diff -- This is copy/paste from the original Jolt processor :) I'll replace it as you said and make sure all is well.
---