[ 
https://issues.apache.org/jira/browse/NIFI-5231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16489768#comment-16489768
 ] 

ASF GitHub Bot commented on NIFI-5231:
--------------------------------------

Github user ottobackwards commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2737#discussion_r190714515
  
    --- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RecordStats.java
 ---
    @@ -0,0 +1,165 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.record.path.FieldValue;
    +import org.apache.nifi.record.path.RecordPath;
    +import org.apache.nifi.record.path.RecordPathResult;
    +import org.apache.nifi.record.path.util.RecordPathCache;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.record.Record;
    +
    +import java.io.InputStream;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +public class RecordStats extends AbstractProcessor {
    +    static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
    +        .name("record-stats-reader")
    +        .displayName("Record Reader")
    +        .description("A record reader to use for reading the records.")
    +        .addValidator(Validator.VALID)
    +        .identifiesControllerService(RecordReaderFactory.class)
    +        .build();
    +
    +    static final Relationship REL_SUCCESS = new Relationship.Builder()
    +        .name("success")
    +        .description("If a flowfile is successfully processed, it goes 
here.")
    +        .build();
    +    static final Relationship REL_FAILURE = new Relationship.Builder()
    +        .name("failure")
    +        .description("If a flowfile fails to be processed, it goes here.")
    +        .build();
    +
    +    protected PropertyDescriptor 
getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        return new PropertyDescriptor.Builder()
    +            .name(propertyDescriptorName)
    +            .displayName(propertyDescriptorName)
    +            .dynamic(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +    }
    +
    +    private RecordPathCache cache;
    +
    +    @OnScheduled
    +    public void onEnabled(ProcessContext context) {
    +        cache = new RecordPathCache(25);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return new HashSet<Relationship>() {{
    +            add(REL_SUCCESS);
    +            add(REL_FAILURE);
    +        }};
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
    +        FlowFile input = session.get();
    +        if (input == null) {
    +            return;
    +        }
    +
    +        try {
    +            Map<String, RecordPath> paths = getRecordPaths(context);
    +            Map<String, String> stats = getStats(input, paths, context, 
session);
    +
    +            input = session.putAllAttributes(input, stats);
    +
    +            session.transfer(input, REL_SUCCESS);
    +
    +        } catch (Exception ex) {
    +            getLogger().error("Error processing stats.", ex);
    +            session.transfer(input, REL_FAILURE);
    +        }
    +
    +    }
    +
    +    protected Map<String, RecordPath> getRecordPaths(ProcessContext 
context) {
    +        return context.getProperties().keySet()
    +            .stream().filter(p -> p.isDynamic() && 
!p.getName().contains(RECORD_READER.getName()))
    +            .collect(Collectors.toMap(
    +                e -> e.getName(),
    +                e ->  {
    +                    String val = context.getProperty(e).getValue();
    +                    return cache.getCompiled(val);
    +                })
    +            );
    +    }
    +
    +    protected Map<String, String> getStats(FlowFile input, Map<String, 
RecordPath> paths, ProcessContext context, ProcessSession session) {
    +        try (InputStream is = session.read(input)) {
    +            RecordReaderFactory factory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
    --- End diff --
    
    input and is as var names is confusing here, can we name them closer to 
what they are to keep them straight?
    flowFile and inputStream?
    There is only one flowFile to track in this processor so none to get fancy 
;)


> Record stats processor
> ----------------------
>
>                 Key: NIFI-5231
>                 URL: https://issues.apache.org/jira/browse/NIFI-5231
>             Project: Apache NiFi
>          Issue Type: New Feature
>            Reporter: Mike Thomsen
>            Assignee: Mike Thomsen
>            Priority: Major
>
> Should the following:
>  
>  # Take a record reader.
>  # Count the # of records and add a record_count attribute to the flowfile.
>  # Allow user-defined properties that do the following:
>  ## Map attribute name -> record path.
>  ## Provide aggregate value counts for each record path statement.
>  ## Provide total count for record path operation.
>  ## Put those values on the flowfile as attributes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to