Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2833#discussion_r210065027
  
    --- Diff: 
nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/java/org/apache/nifi/processors/jolt/record/JoltTransformRecord.java
 ---
    @@ -0,0 +1,534 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.jolt.record;
    +
    +import com.bazaarvoice.jolt.ContextualTransform;
    +import com.bazaarvoice.jolt.JoltTransform;
    +import com.bazaarvoice.jolt.JsonUtils;
    +import com.bazaarvoice.jolt.Transform;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.SystemResource;
    +import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.jolt.record.util.TransformFactory;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.RecordSetWriter;
    +import org.apache.nifi.serialization.RecordSetWriterFactory;
    +import org.apache.nifi.serialization.WriteResult;
    +import org.apache.nifi.serialization.record.ListRecordSet;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordFieldType;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.RecordSet;
    +import org.apache.nifi.serialization.record.util.DataTypeUtils;
    +import org.apache.nifi.util.StopWatch;
    +import org.apache.nifi.util.StringUtils;
    +import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
    +
    +import java.io.FilenameFilter;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.LinkedHashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.stream.Collectors;
    +
    +@EventDriven
    +@SideEffectFree
    +@SupportsBatching
    +@Tags({"record", "jolt", "transform", "shiftr", "chainr", "defaultr", 
"removr", "cardinality", "sort"})
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "record.count", description = "The 
number of records in an outgoing FlowFile"),
    +        @WritesAttribute(attribute = "mime.type", description = "The MIME 
Type that the configured Record Writer indicates is appropriate"),
    +})
    +@CapabilityDescription("Applies a list of Jolt specifications to the 
FlowFile payload. A new FlowFile is created "
    +        + "with transformed content and is routed to the 'success' 
relationship. If the transform "
    +        + "fails, the original FlowFile is routed to the 'failure' 
relationship.")
    +@SystemResourceConsideration(resource = SystemResource.MEMORY, description 
= "If the Jolt transform is applied to the entire record set, memory issues can 
occur "
    +        + "for large record sets.")
    +public class JoltTransformRecord extends AbstractProcessor {
    +
    +    static final AllowableValue SHIFTR
    +            = new AllowableValue("jolt-transform-shift", "Shift", "Shift 
input data to create the output.");
    +    static final AllowableValue CHAINR
    +            = new AllowableValue("jolt-transform-chain", "Chain", "Execute 
list of Jolt transformations.");
    +    static final AllowableValue DEFAULTR
    +            = new AllowableValue("jolt-transform-default", "Default", " 
Apply default values to the output.");
    +    static final AllowableValue REMOVR
    +            = new AllowableValue("jolt-transform-remove", "Remove", " 
Remove values from input data to create the output.");
    +    static final AllowableValue CARDINALITY
    +            = new AllowableValue("jolt-transform-card", "Cardinality", 
"Change the cardinality of input elements to create the output.");
    +    static final AllowableValue SORTR
    +            = new AllowableValue("jolt-transform-sort", "Sort", "Sort 
input field name values alphabetically. Any specification set is ignored.");
    +    static final AllowableValue CUSTOMR
    +            = new AllowableValue("jolt-transform-custom", "Custom", 
"Custom Transformation. Requires Custom Transformation Class Name");
    +    static final AllowableValue MODIFIER_DEFAULTR
    +            = new AllowableValue("jolt-transform-modify-default", "Modify 
- Default", "Writes when field name is missing or value is null");
    +    static final AllowableValue MODIFIER_OVERWRITER
    +            = new AllowableValue("jolt-transform-modify-overwrite", 
"Modify - Overwrite", " Always overwrite value");
    +    static final AllowableValue MODIFIER_DEFINER
    +            = new AllowableValue("jolt-transform-modify-define", "Modify - 
Define", "Writes when key is missing");
    +
    +    static final AllowableValue APPLY_TO_RECORD_SET
    +            = new AllowableValue("jolt-record-apply-recordset", "Entire 
Record Set", "Applies the transformation to the record set as a whole. Used 
when "
    +            + "values from multiple records are needed in the 
transformation.");
    +    static final AllowableValue APPLY_TO_RECORDS
    +            = new AllowableValue("jolt-record-apply-records", "Each 
Record", "Applies the transformation to each record individually.");
    +
    +
    +    static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
    +            .name("jolt-record-record-reader")
    +            .displayName("Record Reader")
    +            .description("Specifies the Controller Service to use for 
parsing incoming data and determining the data's schema.")
    +            .identifiesControllerService(RecordReaderFactory.class)
    +            .required(true)
    +            .build();
    +
    +    static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor.Builder()
    +            .name("jolt-record-record-writer")
    +            .displayName("Record Writer")
    +            .description("Specifies the Controller Service to use for 
writing out the records")
    +            .identifiesControllerService(RecordSetWriterFactory.class)
    +            .required(true)
    +            .build();
    +
    +    static final PropertyDescriptor JOLT_TRANSFORM = new 
PropertyDescriptor.Builder()
    +            .name("jolt-record-transform")
    +            .displayName("Jolt Transformation DSL")
    +            .description("Specifies the Jolt Transformation that should be 
used with the provided specification.")
    +            .required(true)
    +            .allowableValues(CARDINALITY, CHAINR, DEFAULTR, 
MODIFIER_DEFAULTR, MODIFIER_DEFINER, MODIFIER_OVERWRITER, REMOVR, SHIFTR, 
SORTR, CUSTOMR)
    +            .defaultValue(CHAINR.getValue())
    +            .build();
    +
    +    static final PropertyDescriptor JOLT_SPEC = new 
PropertyDescriptor.Builder()
    +            .name("jolt-record-spec")
    +            .displayName("Jolt Specification")
    +            .description("Jolt Specification for transform of record data. 
This value is ignored if the Jolt Sort Transformation is selected.")
    +            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .required(false)
    +            .build();
    +
    +    static final PropertyDescriptor CUSTOM_CLASS = new 
PropertyDescriptor.Builder()
    +            .name("jolt-record-custom-class")
    +            .displayName("Custom Transformation Class Name")
    +            .description("Fully Qualified Class Name for Custom 
Transformation")
    +            .required(false)
    +            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor MODULES = new 
PropertyDescriptor.Builder()
    +            .name("jolt-record-custom-modules")
    +            .displayName("Custom Module Directory")
    +            .description("Comma-separated list of paths to files and/or 
directories which contain modules containing custom transformations (that are 
not included on NiFi's classpath).")
    +            .required(false)
    +            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor TRANSFORM_STRATEGY = new 
PropertyDescriptor.Builder()
    +            .name("jolt-record-transform-strategy")
    +            .displayName("Transformation Strategy")
    +            .description("Specifies whether the transform should be 
applied to the entire record set or to each individual record. Note that when 
the transform is applied to "
    +                    + "the entire record set, the first element in the 
spec should be an asterix (*) in order to match each record.")
    +            .required(true)
    +            .allowableValues(APPLY_TO_RECORD_SET, APPLY_TO_RECORDS)
    +            .defaultValue(APPLY_TO_RECORDS.getValue())
    +            .build();
    +
    +    static final PropertyDescriptor TRANSFORM_CACHE_SIZE = new 
PropertyDescriptor.Builder()
    +            .name("jolt-record-transform-cache-size")
    +            .displayName("Transform Cache Size")
    +            .description("Compiling a Jolt Transform can be fairly 
expensive. Ideally, this will be done only once. However, if the Expression 
Language is used in the transform, we may need "
    +                    + "a new Transform for each FlowFile. This value 
controls how many of those Transforms we cache in memory in order to avoid 
having to compile the Transform each time.")
    +            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1")
    +            .required(true)
    +            .build();
    +
    +    static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("The FlowFile with transformed content will be 
routed to this relationship")
    +            .build();
    +    static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("If a FlowFile fails processing for any reason 
(for example, the FlowFile records cannot be parsed), it will be routed to this 
relationship")
    +            .build();
    +
    +    private final static List<PropertyDescriptor> properties;
    +    private final static Set<Relationship> relationships;
    +    private volatile ClassLoader customClassLoader;
    +    private final static String DEFAULT_CHARSET = "UTF-8";
    +
    +    // Cache is guarded by synchronizing on 'this'.
    +    private volatile int maxTransformsToCache = 10;
    +    private final Map<String, JoltTransform> transformCache = new 
LinkedHashMap<String, JoltTransform>() {
    +        @Override
    +        protected boolean removeEldestEntry(Map.Entry<String, 
JoltTransform> eldest) {
    +            final boolean evict = size() > maxTransformsToCache;
    +            if (evict) {
    +                getLogger().debug("Removing Jolt Transform from cache 
because cache is full");
    +            }
    +            return evict;
    +        }
    +    };
    +
    +    static {
    +        final List<PropertyDescriptor> _properties = new ArrayList<>();
    +        _properties.add(RECORD_READER);
    +        _properties.add(RECORD_WRITER);
    +        _properties.add(JOLT_TRANSFORM);
    +        _properties.add(CUSTOM_CLASS);
    +        _properties.add(MODULES);
    +        _properties.add(JOLT_SPEC);
    +        _properties.add(TRANSFORM_STRATEGY);
    +        _properties.add(TRANSFORM_CACHE_SIZE);
    +        properties = Collections.unmodifiableList(_properties);
    +
    +        final Set<Relationship> _relationships = new HashSet<>();
    +        _relationships.add(REL_SUCCESS);
    +        _relationships.add(REL_FAILURE);
    +        relationships = Collections.unmodifiableSet(_relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    protected Collection<ValidationResult> 
customValidate(ValidationContext validationContext) {
    +        final List<ValidationResult> results = new 
ArrayList<>(super.customValidate(validationContext));
    +        final String transform = 
validationContext.getProperty(JOLT_TRANSFORM).getValue();
    +        final String customTransform = 
validationContext.getProperty(CUSTOM_CLASS).getValue();
    +        final String modulePath = 
validationContext.getProperty(MODULES).isSet() ? 
validationContext.getProperty(MODULES).getValue() : null;
    +
    +        if (!validationContext.getProperty(JOLT_SPEC).isSet() || 
StringUtils.isEmpty(validationContext.getProperty(JOLT_SPEC).getValue())) {
    +            if (!SORTR.getValue().equals(transform)) {
    +                final String message = "A specification is required for 
this transformation";
    +                results.add(new ValidationResult.Builder().valid(false)
    +                        .explanation(message)
    +                        .build());
    +            }
    +        } else {
    +            final ClassLoader customClassLoader;
    +
    +            try {
    +                if (modulePath != null) {
    --- End diff --
    
    This is copy/paste from the original Jolt processor :) I'll replace it as 
you said and make sure all is well.


---

Reply via email to