[ 
https://issues.apache.org/jira/browse/NIFI-1942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15396644#comment-15396644
 ] 

ASF GitHub Bot commented on NIFI-1942:
--------------------------------------

Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/476#discussion_r72542364
  
    --- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java
 ---
    @@ -0,0 +1,613 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.standard;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.InputStreamReader;
    +import java.io.OutputStream;
    +import java.io.Reader;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.supercsv.cellprocessor.Optional;
    +import org.supercsv.cellprocessor.ParseBigDecimal;
    +import org.supercsv.cellprocessor.ParseBool;
    +import org.supercsv.cellprocessor.ParseChar;
    +import org.supercsv.cellprocessor.ParseDate;
    +import org.supercsv.cellprocessor.ParseDouble;
    +import org.supercsv.cellprocessor.ParseInt;
    +import org.supercsv.cellprocessor.ParseLong;
    +import org.supercsv.cellprocessor.constraint.DMinMax;
    +import org.supercsv.cellprocessor.constraint.Equals;
    +import org.supercsv.cellprocessor.constraint.ForbidSubStr;
    +import org.supercsv.cellprocessor.constraint.IsIncludedIn;
    +import org.supercsv.cellprocessor.constraint.LMinMax;
    +import org.supercsv.cellprocessor.constraint.NotNull;
    +import org.supercsv.cellprocessor.constraint.RequireHashCode;
    +import org.supercsv.cellprocessor.constraint.RequireSubStr;
    +import org.supercsv.cellprocessor.constraint.StrMinMax;
    +import org.supercsv.cellprocessor.constraint.StrNotNullOrEmpty;
    +import org.supercsv.cellprocessor.constraint.StrRegEx;
    +import org.supercsv.cellprocessor.constraint.Strlen;
    +import org.supercsv.cellprocessor.constraint.Unique;
    +import org.supercsv.cellprocessor.constraint.UniqueHashCode;
    +import org.supercsv.cellprocessor.ift.CellProcessor;
    +import org.supercsv.exception.SuperCsvCellProcessorException;
    +import org.supercsv.io.CsvListReader;
    +import org.supercsv.prefs.CsvPreference;
    +
    +@EventDriven
    +@SideEffectFree
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"csv", "schema", "validation"})
    +@CapabilityDescription("Validates the contents of FlowFiles against a 
user-specified CSV schema. " +
    +        "Take a look at the additional documentation of this processor for 
some schema examples.")
    +public class ValidateCsv extends AbstractProcessor {
    +
    +    private final static List<String> allowedOperators = 
Arrays.asList("ParseBigDecimal", "ParseBool", "ParseChar", "ParseDate",
    +            "ParseDouble", "ParseInt", "ParseLong", "Optional", "DMinMax", 
"Equals", "ForbidSubStr", "LMinMax", "NotNull", "Null",
    +            "RequireHashCode", "RequireSubStr", "Strlen", "StrMinMax", 
"StrNotNullOrEmpty", "StrRegEx", "Unique",
    +            "UniqueHashCode", "IsIncludedIn");
    +
    +    private static final String routeWholeFlowFile = "Route to 'valid' the 
whole FlowFile if the CSV is valid";
    +    private static final String routeLinesIndividually = "Route to 'valid' 
a FlowFile containing the valid lines and"
    +            + " to 'invalid' a FlowFile containing the invalid lines";
    +
    +    public static final AllowableValue VALIDATE_WHOLE_FLOWFILE = new 
AllowableValue(routeWholeFlowFile, routeWholeFlowFile,
    +            "In case an error is found in the CSV file, the whole flow 
file will be routed to the 'invalid' relationship. "
    +                    + "This option offers best performances.");
    +
    +    public static final AllowableValue VALIDATE_LINES_INDIVIDUALLY = new 
AllowableValue(routeLinesIndividually, routeLinesIndividually,
    +            "In case an error is found, the input CSV file will be split 
into two FlowFiles: one routed to the 'valid' "
    +                    + "relationship containing all the correct lines and 
one routed to the 'invalid' relationship containing all "
    +                    + "the incorrect lines. Take care if choosing this 
option while using Unique cell processors in schema definition.");
    +
    +    public static final PropertyDescriptor SCHEMA = new 
PropertyDescriptor.Builder()
    +            .name("validate-csv-schema")
    +            .displayName("Schema")
    +            .description("The schema to be used for validation. Is 
expected a comma-delimited string representing the cell "
    +                    + "processors to apply. The following cell processors 
are allowed in the schema definition: "
    +                    + allowedOperators.toString() + ". Note: cell 
processors cannot be nested except with Optional.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor HEADER = new 
PropertyDescriptor.Builder()
    +            .name("validate-csv-header")
    +            .displayName("Header")
    +            .description("True if the incoming flow file contains a header 
to ignore, false otherwise.")
    +            .required(true)
    +            .defaultValue("true")
    +            .allowableValues("true", "false")
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor QUOTE_CHARACTER = new 
PropertyDescriptor.Builder()
    +            .name("validate-csv-quote")
    +            .displayName("Quote character")
    +            .description("Character used as 'quote' in the incoming data. 
Example: \"")
    +            .required(true)
    +            .defaultValue("\"")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor DELIMITER_CHARACTER = new 
PropertyDescriptor.Builder()
    +            .name("validate-csv-delimiter")
    +            .displayName("Delimiter character")
    +            .description("Character used as 'delimiter' in the incoming 
data. Example: ,")
    +            .required(true)
    +            .defaultValue(",")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor END_OF_LINE_CHARACTER = new 
PropertyDescriptor.Builder()
    +            .name("validate-csv-eol")
    +            .displayName("End of line symbols")
    +            .description("Symbols used as 'end of line' in the incoming 
data. Example: \\n")
    +            .required(true)
    +            .defaultValue("\\n")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor VALIDATION_STRATEGY = new 
PropertyDescriptor.Builder()
    +            .name("validate-csv-strategy")
    +            .displayName("Validation strategy")
    +            .description("Strategy to apply when routing input files to 
output relationships.")
    +            .required(true)
    +            .defaultValue(VALIDATE_WHOLE_FLOWFILE.getValue())
    +            .allowableValues(VALIDATE_LINES_INDIVIDUALLY, 
VALIDATE_WHOLE_FLOWFILE)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final Relationship REL_VALID = new Relationship.Builder()
    +            .name("valid")
    +            .description("FlowFiles that are successfully validated 
against the schema are routed to this relationship")
    +            .build();
    +    public static final Relationship REL_INVALID = new 
Relationship.Builder()
    +            .name("invalid")
    +            .description("FlowFiles that are not valid according to the 
specified schema are routed to this relationship")
    +            .build();
    +
    +    private List<PropertyDescriptor> properties;
    +    private Set<Relationship> relationships;
    +    private final AtomicReference<CellProcessor[]> processors = new 
AtomicReference<CellProcessor[]>();
    +    private final AtomicReference<CsvPreference> preference = new 
AtomicReference<CsvPreference>();
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(SCHEMA);
    +        properties.add(HEADER);
    +        properties.add(DELIMITER_CHARACTER);
    +        properties.add(QUOTE_CHARACTER);
    +        properties.add(END_OF_LINE_CHARACTER);
    +        properties.add(VALIDATION_STRATEGY);
    +        this.properties = Collections.unmodifiableList(properties);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_VALID);
    +        relationships.add(REL_INVALID);
    +        this.relationships = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    protected Collection<ValidationResult> 
customValidate(ValidationContext validationContext) {
    +        String schema = validationContext.getProperty(SCHEMA).getValue();
    +        try {
    +            
this.parseSchema(validationContext.getProperty(SCHEMA).getValue());
    +        } catch (Exception e) {
    +            final List<ValidationResult> problems = new ArrayList<>(1);
    +            problems.add(new 
ValidationResult.Builder().subject(SCHEMA.getName())
    +                    .input(schema)
    +                    .valid(false)
    +                    .explanation("Error while parsing the schema: " + 
e.getMessage())
    +                    .build());
    +            return problems;
    +        }
    +        return super.customValidate(validationContext);
    +    }
    +
    +    @OnScheduled
    +    public void setPreference(final ProcessContext context) {
    +        this.preference.set(new 
CsvPreference.Builder(context.getProperty(QUOTE_CHARACTER).getValue().charAt(0),
    +                
context.getProperty(DELIMITER_CHARACTER).getValue().charAt(0),
    +                
context.getProperty(END_OF_LINE_CHARACTER).getValue()).build());
    +    }
    +
    +    /**
    +     * Method used to parse the string supplied by the user. The string is 
converted
    +     * to a list of cell processors used to validate the CSV data.
    +     * @param schema Schema to parse
    +     */
    +    private void parseSchema(String schema) {
    +        List<CellProcessor> processorsList = new 
ArrayList<CellProcessor>();
    +
    +        String remaining = schema;
    +        while(remaining.length() > 0) {
    +            remaining = setProcessor(remaining, processorsList);
    +        }
    +
    +        this.processors.set(processorsList.toArray(new 
CellProcessor[processorsList.size()]));
    +    }
    +
    +    private String setProcessor(String remaining, List<CellProcessor> 
processorsList) {
    +        StringBuffer buffer = new StringBuffer();
    +        int i = 0;
    +        int opening = 0;
    +        int closing = 0;
    +        while(buffer.length() != remaining.length()) {
    +            char c = remaining.charAt(i);
    +            i++;
    +
    +            if(opening == 0 && c == ',') {
    +                if(i == 1) {
    +                    continue;
    +                }
    +                break;
    +            }
    +
    +            buffer.append(c);
    +
    +            if(c == '(') {
    +                opening++;
    +            } else if(c == ')') {
    +                closing++;
    +            }
    +
    +            if(opening > 0 && opening == closing) {
    +                break;
    +            }
    +        }
    +
    +        final String procString = buffer.toString().trim();
    +        opening = procString.indexOf('(');
    +        String method = procString;
    +        String argument = null;
    +        if(opening != -1) {
    +            argument = method.substring(opening + 1, method.length() - 1);
    +            method = method.substring(0, opening);
    +        }
    +
    +        processorsList.add(getProcessor(method.toLowerCase(), argument));
    +
    +        return remaining.substring(i);
    +    }
    +
    +    private CellProcessor getProcessor(String method, String argument) {
    +        switch (method) {
    +
    +            case "optional":
    +                int opening = argument.indexOf('(');
    +                String subMethod = argument;
    +                String subArgument = null;
    +                if(opening != -1) {
    +                    subArgument = subMethod.substring(opening + 1, 
subMethod.length() - 1);
    +                    subMethod = subMethod.substring(0, opening);
    +                }
    +                return new Optional(getProcessor(subMethod.toLowerCase(), 
subArgument));
    +
    +            case "parsedate":
    +                return new ParseDate(argument.substring(1, 
argument.length() - 1));
    +
    +            case "parsedouble":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("ParseDouble does 
not expect any argument but has " + argument);
    +                return new ParseDouble();
    +
    +            case "parsebigdecimal":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("ParseBigDecimal 
does not expect any argument but has " + argument);
    +                return new ParseBigDecimal();
    +
    +            case "parsebool":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("ParseBool does not 
expect any argument but has " + argument);
    +                return new ParseBool();
    +
    +            case "parsechar":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("ParseChar does not 
expect any argument but has " + argument);
    +                return new ParseChar();
    +
    +            case "parseint":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("ParseInt does not 
expect any argument but has " + argument);
    +                return new ParseInt();
    +
    +            case "parselong":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("ParseLong does not 
expect any argument but has " + argument);
    +                return new ParseLong();
    +
    +            case "notnull":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("NotNull does not 
expect any argument but has " + argument);
    +                return new NotNull();
    +
    +            case "strregex":
    +                return new StrRegEx(argument.substring(1, 
argument.length() - 1));
    +
    +            case "unique":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("Unique does not 
expect any argument but has " + argument);
    +                return new Unique();
    +
    +            case "uniquehashcode":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("UniqueHashCode 
does not expect any argument but has " + argument);
    +                return new UniqueHashCode();
    +
    +            case "strlen":
    +                String[] splts = argument.split(",");
    +                int[] requiredLengths = new int[splts.length];
    +                for(int i = 0; i < splts.length; i++) {
    +                    requiredLengths[i] = Integer.parseInt(splts[i]);
    +                }
    +                return new Strlen(requiredLengths);
    +
    +            case "strminmax":
    +                String[] splits = argument.split(",");
    +                return new StrMinMax(Long.parseLong(splits[0]), 
Long.parseLong(splits[1]));
    +
    +            case "lminmax":
    +                String[] args = argument.split(",");
    +                return new LMinMax(Long.parseLong(args[0]), 
Long.parseLong(args[1]));
    +
    +            case "dminmax":
    +                String[] doubles = argument.split(",");
    +                return new DMinMax(Double.parseDouble(doubles[0]), 
Double.parseDouble(doubles[1]));
    +
    +            case "equals":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("Equals does not 
expect any argument but has " + argument);
    +                return new Equals();
    +
    +            case "forbidsubstr":
    +                String[] forbiddenSubStrings = argument.replaceAll("\"", 
"").split(",[ ]*");
    +                return new ForbidSubStr(forbiddenSubStrings);
    +
    +            case "requiresubstr":
    +                String[] requiredSubStrings = argument.replaceAll("\"", 
"").split(",[ ]*");
    +                return new RequireSubStr(requiredSubStrings);
    +
    +            case "strnotnullorempty":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("StrNotNullOrEmpty 
does not expect any argument but has " + argument);
    +                return new StrNotNullOrEmpty();
    +
    +            case "requirehashcode":
    +                String[] hashs = argument.split(",");
    +                int[] hashcodes = new int[hashs.length];
    +                for(int i = 0; i < hashs.length; i++) {
    +                    hashcodes[i] = Integer.parseInt(hashs[i]);
    +                }
    +                return new RequireHashCode(hashcodes);
    +
    +            case "null":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("Null does not 
expect any argument but has " + argument);
    +                return null;
    +
    +            case "isincludedin":
    +                String[] elements = argument.replaceAll("\"", 
"").split(",[ ]*");
    +                return new IsIncludedIn(elements);
    +
    +            default:
    +                throw new IllegalArgumentException("[" + method + "] is 
not an allowed method to define a Cell Processor");
    +        }
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final 
ProcessSession session) {
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final CsvPreference csvPref = this.preference.get();
    +        final boolean header = context.getProperty(HEADER).asBoolean();
    +        final ComponentLog logger = getLogger();
    +        final CellProcessor[] cellProcs = this.processors.get();
    +        final boolean isWholeFFValidation = 
context.getProperty(VALIDATION_STRATEGY).getValue().equals(VALIDATE_WHOLE_FLOWFILE.getValue());
    +
    +        final AtomicReference<Boolean> valid = new 
AtomicReference<Boolean>(true);
    +        final AtomicReference<Boolean> isFirstLine = new 
AtomicReference<Boolean>(true);
    +        final AtomicReference<Integer> okCount = new 
AtomicReference<Integer>(0);
    +        final AtomicReference<Integer> totalCount = new 
AtomicReference<Integer>(0);
    +        final AtomicReference<FlowFile> invalidFF = new 
AtomicReference<FlowFile>(null);
    +        final AtomicReference<FlowFile> validFF = new 
AtomicReference<FlowFile>(null);
    +
    +        if(!isWholeFFValidation) {
    +            invalidFF.set(session.create(flowFile));
    +            validFF.set(session.create(flowFile));
    +        }
    +
    +        session.read(flowFile, new InputStreamCallback() {
    +            @Override
    +            public void process(final InputStream in) throws IOException {
    +                NifiCsvListReader listReader = null;
    +                try {
    +                    listReader = new NifiCsvListReader(new 
InputStreamReader(in), csvPref);
    +
    +                    // handling of header
    +                    if(header) {
    +                        List<String> headerList = listReader.read();
    +                        if(!isWholeFFValidation) {
    +                            invalidFF.set(session.append(invalidFF.get(), 
new OutputStreamCallback() {
    +                                @Override
    +                                public void process(OutputStream out) 
throws IOException {
    +                                    out.write(print(headerList, csvPref, 
isFirstLine.get()));
    +                                }
    +                            }));
    +                            validFF.set(session.append(validFF.get(), new 
OutputStreamCallback() {
    +                                @Override
    +                                public void process(OutputStream out) 
throws IOException {
    +                                    out.write(print(headerList, csvPref, 
isFirstLine.get()));
    +                                }
    +                            }));
    +                            isFirstLine.set(false);
    +                        }
    +                    }
    +
    +                    boolean stop = false;
    +
    +                    while (!stop) {
    +                        try {
    +
    +                            final List<Object> list = 
listReader.read(cellProcs);
    +                            stop = list == null;
    +
    +                            if(!isWholeFFValidation && !stop) {
    +                                validFF.set(session.append(validFF.get(), 
new OutputStreamCallback() {
    +                                    @Override
    +                                    public void process(OutputStream out) 
throws IOException {
    +                                        out.write(print(list, csvPref, 
isFirstLine.get()));
    +                                    }
    +                                }));
    +                                okCount.set(okCount.get() + 1);
    +
    +                                if(isFirstLine.get()) {
    +                                    isFirstLine.set(false);
    +                                }
    +                            }
    +
    +                        } catch (final SuperCsvCellProcessorException e) {
    +                            valid.set(false);
    +                            if(isWholeFFValidation) {
    +                                logger.debug("Failed to validate {} 
against schema due to {}; routing to 'invalid'", new Object[]{flowFile}, e);
    +                                break;
    +                            } else {
    +                                // we append the invalid line to the flow 
file that will be routed to invalid relationship
    +                                
invalidFF.set(session.append(invalidFF.get(), new OutputStreamCallback() {
    +                                    @Override
    +                                    public void process(OutputStream out) 
throws IOException {
    +                                        
out.write(print(e.getCsvContext().getRowSource(), csvPref, isFirstLine.get()));
    +                                    }
    +                                }));
    +
    +                                if(isFirstLine.get()) {
    +                                    isFirstLine.set(false);
    +                                }
    +                            }
    +                        } finally {
    +                            if(!isWholeFFValidation) {
    +                                totalCount.set(totalCount.get() + 1);
    +                            }
    +                        }
    +                    }
    +
    +                } catch (final IOException e) {
    +                    valid.set(false);
    +                    logger.error("Failed to validate {} against schema due 
to {}", new Object[]{flowFile}, e);
    +                } finally {
    +                    if(listReader != null) {
    +                        listReader.close();
    +                    }
    +                }
    +            }
    +        });
    +
    +        if(isWholeFFValidation) {
    +            if (valid.get()) {
    +                logger.info("Successfully validated {} against schema; 
routing to 'valid'", new Object[]{flowFile});
    +                session.getProvenanceReporter().route(flowFile, REL_VALID);
    +                session.transfer(flowFile, REL_VALID);
    +            } else {
    +                session.getProvenanceReporter().route(flowFile, 
REL_INVALID);
    +                session.transfer(flowFile, REL_INVALID);
    +            }
    +        } else {
    +            if (valid.get()) {
    +                logger.debug("Successfully validated {} against schema; 
routing to 'valid'", new Object[]{validFF.get()});
    +                session.getProvenanceReporter().route(validFF.get(), 
REL_VALID, "All " + totalCount.get() + " line(s) are valid");
    +                session.transfer(validFF.get(), REL_VALID);
    +                session.remove(invalidFF.get());
    +                session.remove(flowFile);
    +            } else if (okCount.get() != 0) {
    +                if(header) {
    +                    totalCount.set(totalCount.get() - 1);
    +                }
    +
    +                logger.debug("Successfully validated {}/{} line(s) in {} 
against schema; routing valid lines to 'valid' and invalid lines to 'invalid'",
    +                        new Object[]{okCount.get(), totalCount.get(), 
flowFile});
    +                session.getProvenanceReporter().route(validFF.get(), 
REL_VALID, okCount.get() + " valid line(s)");
    +                session.transfer(validFF.get(), REL_VALID);
    +                session.getProvenanceReporter().route(invalidFF.get(), 
REL_INVALID, (totalCount.get() - okCount.get()) + " invalid line(s)");
    +                session.transfer(invalidFF.get(), REL_INVALID);
    +                session.remove(flowFile);
    +            } else {
    +                logger.debug("All lines in {} are invalid; routing to 
'invalid'", new Object[]{invalidFF.get()});
    +                session.getProvenanceReporter().route(invalidFF.get(), 
REL_INVALID, "All " + totalCount.get() + " line(s) are invalid");
    +                session.transfer(invalidFF.get(), REL_INVALID);
    +                session.remove(validFF.get());
    +                session.remove(flowFile);
    +            }
    +        }
    +    }
    +
    +    /**
    +     * Method used to correctly write the lines by taking into account end 
of line
    +     * character and separator character.
    +     * @param list list of elements of the current row
    +     * @param csvPref CSV preferences
    +     * @param isFirstLine true if this is the first line we append
    +     * @return String to append in the flow file
    +     */
    +    private byte[] print(List<?> list, CsvPreference csvPref, boolean 
isFirstLine) {
    --- End diff --
    
    Not sure why but after being routed through the processor instead of '\n' 
the new line character the literals "\" and "n" were added. Below are two 
screen shots of the data below and after (only line routed to invalid)
    
    ![screen shot 2016-07-27 at 7 23 52 
pm](https://cloud.githubusercontent.com/assets/11302527/17196060/e4267118-542f-11e6-8831-83f8389a615a.png)
    ![screen shot 2016-07-27 at 7 23 55 
pm](https://cloud.githubusercontent.com/assets/11302527/17196059/e424d600-542f-11e6-8e35-5f1274912ef1.png)
     


> Create a processor to validate CSV against a user-supplied schema
> -----------------------------------------------------------------
>
>                 Key: NIFI-1942
>                 URL: https://issues.apache.org/jira/browse/NIFI-1942
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Extensions
>            Reporter: Pierre Villard
>            Assignee: Pierre Villard
>            Priority: Minor
>             Fix For: 1.0.0
>
>         Attachments: ValidateCSV.xml
>
>
> In order to extend the set of "quality control" processors, it would be 
> interesting to have a processor validating CSV formatted flow files against a 
> user-specified schema.
> Flow file validated against schema would be routed to "valid" relationship 
> although flow file not validated against schema would be routed to "invalid" 
> relationship.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to