[ 
https://issues.apache.org/jira/browse/NIFI-1829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15308223#comment-15308223
 ] 

ASF GitHub Bot commented on NIFI-1829:
--------------------------------------

Github user trkurc commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/458#discussion_r65231261
  
    --- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java
 ---
    @@ -0,0 +1,414 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.logging.ProcessorLog;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +@EventDriven()
    +@Tags({"test", "debug", "processor", "utility", "flow", "flowfile"})
    +@CapabilityDescription("This processor aids in the testing and debugging 
of the flowfile framework by allowing "
    +        + "a developer or flow manager to force various responses to a 
flowfile.  In response to a receiving a flowfile "
    +        + "it can route to the success or failure relationships, rollback, 
rollback and yield, rollback with penalty, "
    +        + "or throw an exception.  In addition, if using a timer based 
scheduling strategy, upon being triggered without "
    +        + "a flowfile, it can be configured to throw an exception,  when 
triggered without a flowfile.\n"
    +        + "\n"
    +        + "The 'iterations' properties, such as \"Success iterations\", 
configure how many times each response should occur "
    +        + "in succession before moving on to the next response within the 
group of flowfile responses or no flowfile"
    +        + "responses.\n"
    +        + "\n"
    +        + "The order of responses when a flow file is received are:"
    +        + "  1. transfer flowfile to success relationship.\n"
    +        + "  2. transfer flowfile to failure relationship.\n"
    +        + "  3. rollback the flowfile without penalty.\n"
    +        + "  4. rollback the flowfile and yield the context.\n"
    +        + "  5. rollback the flowfile with penalty.\n"
    +        + "  6. throw an exception.\n"
    +        + "\n"
    +        + "The order of responses when no flow file is received are:"
    +        + "  1. yield the context.\n"
    +        + "  2. throw an exception.\n"
    +        + "  3. do nothing and return.\n"
    +        + "\n"
    +        + "By default, the processor is configured to perform each 
response one time.  After processing the list of "
    +        + "responses it will resume from the top of the list.\n"
    +        + "\n"
    +        + "To suppress any response, it's value can be set to zero (0) and 
no responses of that type will occur during "
    +        + "processing.")
    +public class DebugFlow extends AbstractProcessor {
    +
    +    private Set<Relationship> relationships = null;
    +
    +    static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("Flowfiles processed successfully.")
    +            .build();
    +    static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("Flowfiles that failed to process.")
    +            .build();
    +
    +    private List<PropertyDescriptor> propertyDescriptors = null;
    +
    +    static final PropertyDescriptor FF_SUCCESS_ITERATIONS = new 
PropertyDescriptor.Builder()
    +            .name("Success Iterations")
    +            .description("Number of flowfiles to forward to success 
relationship.")
    +            .required(true)
    +            .defaultValue("1")
    +            
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor FF_FAILURE_ITERATIONS = new 
PropertyDescriptor.Builder()
    +            .name("Failure Iterations")
    +            .description("Number of flowfiles to forward to failure 
relationship.")
    +            .required(true)
    +            .defaultValue("0")
    +            
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor FF_ROLLBACK_ITERATIONS = new 
PropertyDescriptor.Builder()
    +            .name("Rollback Iterations")
    +            .description("Number of flowfiles to roll back (without 
penalty).")
    +            .required(true)
    +            .defaultValue("0")
    +            
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor FF_ROLLBACK_YIELD_ITERATIONS = new 
PropertyDescriptor.Builder()
    +            .name("Rollback Yield Iterations")
    +            .description("Number of flowfiles to roll back and yield.")
    +            .required(true)
    +            .defaultValue("0")
    +            
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor FF_ROLLBACK_PENALTY_ITERATIONS = new 
PropertyDescriptor.Builder()
    +            .name("Rollback Penalty Iterations")
    +            .description("Number of flowfiles to roll back with penalty.")
    +            .required(true)
    +            .defaultValue("0")
    +            
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor FF_EXCEPTION_ITERATIONS = new 
PropertyDescriptor.Builder()
    +            .name("Exception Iterations")
    +            .description("Number of flowfiles to throw NPE exception.")
    +            .required(true)
    +            .defaultValue("0")
    +            
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor NO_FF_EXCEPTION_ITERATIONS = new 
PropertyDescriptor.Builder()
    +            .name("No Flowfile Exception Iterations")
    +            .description("Number of times to throw NPE exception if no 
flowfile.")
    +            .required(true)
    +            .defaultValue("0")
    +            
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor NO_FF_YIELD_ITERATIONS = new 
PropertyDescriptor.Builder()
    +            .name("No Flowfile Yield Iterations")
    +            .description("Number of times to yield if no flowfile.")
    +            .required(true)
    +            .defaultValue("0")
    +            
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    static final PropertyDescriptor NO_FF_SKIP_ITERATIONS = new 
PropertyDescriptor.Builder()
    +            .name("No Flowfile Skip Iterations")
    +            .description("Number of times to skip onTrigger if no 
flowfile.")
    +            .required(true)
    +            .defaultValue("1")
    +            
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    Integer FF_SUCCESS_MAX = 0;
    +    private Integer FF_FAILURE_MAX = 0;
    +    private Integer FF_ROLLBACK_MAX = 0;
    +    private Integer FF_YIELD_MAX = 0;
    +    private Integer FF_PENALTY_MAX = 0;
    +    private Integer FF_EXCEPTION_MAX = 0;
    +
    +    private Integer NO_FF_EXCEPTION_MAX = 0;
    +    private Integer NO_FF_YIELD_MAX = 0;
    +    private Integer NO_FF_SKIP_MAX = 0;
    +
    +    private Integer FF_SUCCESS_CURR = 0;
    +    private Integer FF_FAILURE_CURR = 0;
    +    private Integer FF_ROLLBACK_CURR = 0;
    +    private Integer FF_YIELD_CURR = 0;
    +    private Integer FF_PENALTY_CURR = 0;
    +    private Integer FF_EXCEPTION_CURR = 0;
    +
    +    private Integer NO_FF_EXCEPTION_CURR = 0;
    +    private Integer NO_FF_YIELD_CURR = 0;
    +    private Integer NO_FF_SKIP_CURR = 0;
    +
    +    private FlowfileResponse curr_ff_resp;
    +    private NoFlowfileResponse curr_noff_resp;
    +
    +    private enum FlowfileResponse {
    +        FF_SUCCESS_RESPONSE(0, 1),
    +        FF_FAILURE_RESPONSE(1, 2),
    +        FF_ROLLBACK_RESPONSE(2, 3),
    +        FF_YIELD_RESPONSE(3, 4),
    +        FF_PENALTY_RESPONSE(4, 5),
    +        FF_EXCEPTION_RESPONSE(5, 0);
    +
    +        private Integer id;
    +        private Integer nextId;
    +        private FlowfileResponse next;
    +
    +        private static final Map<Integer, FlowfileResponse> byId = new 
HashMap<>();
    +        static {
    +            for (FlowfileResponse rc : FlowfileResponse.values()) {
    +                if (byId.put(rc.id, rc) != null) {
    +                    throw new IllegalArgumentException("duplicate id: " + 
rc.id);
    +                }
    +            }
    +            for (FlowfileResponse rc : FlowfileResponse.values()) {
    +                rc.next = byId.get(rc.nextId);
    +            }
    +        }
    +        FlowfileResponse(Integer pId, Integer pNext) {
    +            id = pId;
    +            nextId = pNext;
    +        }
    +        FlowfileResponse getNextCycle() {
    +            return next;
    +        }
    +    }
    +
    +    private enum NoFlowfileResponse {
    +        NO_FF_EXCEPTION_RESPONSE(0, 1),
    +        NO_FF_YIELD_RESPONSE(1, 2),
    +        NO_FF_SKIP_RESPONSE(2, 0);
    +
    +        private Integer id;
    +        private Integer nextId;
    +        private NoFlowfileResponse next;
    +
    +        private static final Map<Integer, NoFlowfileResponse> byId = new 
HashMap<>();
    +        static {
    +            for (NoFlowfileResponse rc : NoFlowfileResponse.values()) {
    +                if (byId.put(rc.id, rc) != null) {
    +                    throw new IllegalArgumentException("duplicate id: " + 
rc.id);
    +                }
    +            }
    +            for (NoFlowfileResponse rc : NoFlowfileResponse.values()) {
    +                rc.next = byId.get(rc.nextId);
    +            }
    +        }
    +        NoFlowfileResponse(Integer pId, Integer pNext) {
    +            id = pId;
    +            nextId = pNext;
    +        }
    +        NoFlowfileResponse getNextCycle() {
    +            return next;
    +        }
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        if (relationships == null) {
    +            HashSet<Relationship> relSet = new HashSet<>();
    +            relSet.add(REL_SUCCESS);
    +            relSet.add(REL_FAILURE);
    +            relationships = Collections.unmodifiableSet(relSet);
    +        }
    +        return relationships;
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        if (propertyDescriptors == null) {
    +            ArrayList<PropertyDescriptor> propList = new ArrayList<>();
    +            propList.add(FF_SUCCESS_ITERATIONS);
    +            propList.add(FF_FAILURE_ITERATIONS);
    +            propList.add(FF_ROLLBACK_ITERATIONS);
    +            propList.add(FF_ROLLBACK_YIELD_ITERATIONS);
    +            propList.add(FF_ROLLBACK_PENALTY_ITERATIONS);
    +            propList.add(FF_EXCEPTION_ITERATIONS);
    +            propList.add(NO_FF_EXCEPTION_ITERATIONS);
    +            propList.add(NO_FF_YIELD_ITERATIONS);
    +            propList.add(NO_FF_SKIP_ITERATIONS);
    +            propertyDescriptors = Collections.unmodifiableList(propList);
    +        }
    +        return propertyDescriptors;
    +    }
    +
    +    @SuppressWarnings("unused")
    +    @OnScheduled
    +    public void onScheduled(ProcessContext context) {
    +        FF_SUCCESS_MAX = 
context.getProperty(FF_SUCCESS_ITERATIONS).asInteger();
    +        FF_FAILURE_MAX = 
context.getProperty(FF_FAILURE_ITERATIONS).asInteger();
    +        FF_YIELD_MAX = 
context.getProperty(FF_ROLLBACK_YIELD_ITERATIONS).asInteger();
    +        FF_ROLLBACK_MAX = 
context.getProperty(FF_ROLLBACK_ITERATIONS).asInteger();
    +        FF_PENALTY_MAX = 
context.getProperty(FF_ROLLBACK_PENALTY_ITERATIONS).asInteger();
    +        FF_EXCEPTION_MAX = 
context.getProperty(FF_EXCEPTION_ITERATIONS).asInteger();
    +        NO_FF_EXCEPTION_MAX = 
context.getProperty(NO_FF_EXCEPTION_ITERATIONS).asInteger();
    +        NO_FF_YIELD_MAX = 
context.getProperty(NO_FF_YIELD_ITERATIONS).asInteger();
    +        NO_FF_SKIP_MAX = 
context.getProperty(NO_FF_SKIP_ITERATIONS).asInteger();
    +        curr_ff_resp = FlowfileResponse.FF_SUCCESS_RESPONSE;
    +        curr_noff_resp = NoFlowfileResponse.NO_FF_EXCEPTION_RESPONSE;
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
    +        final ProcessorLog logger = getLogger();
    +
    +        FlowFile ff = session.get();
    +
    +        // Make up to 2 passes to allow rollover from last cycle to first.
    +        // (This could be "while(true)" since responses should break out  
if selected, but this
    +        //  prevents endless loops in the event of unexpected errors or 
future changes.)
    +        int pass = 2;
    +        while (pass > 0) {
    +            pass -= 1;
    +            if (ff == null) {
    +                if (curr_noff_resp == 
NoFlowfileResponse.NO_FF_EXCEPTION_RESPONSE) {
    +                    if (NO_FF_EXCEPTION_CURR < NO_FF_EXCEPTION_MAX) {
    +                        NO_FF_EXCEPTION_CURR += 1;
    +                        logger.info("DebugFlow throwing NPE with no flow 
file");
    +                        throw new NullPointerException("forced by " + 
this.getClass().getName());
    +                    } else {
    +                        NO_FF_EXCEPTION_CURR = 0;
    +                        curr_noff_resp = curr_noff_resp.getNextCycle();
    +                    }
    +                }
    +                if (curr_noff_resp == 
NoFlowfileResponse.NO_FF_YIELD_RESPONSE) {
    +                    if (NO_FF_YIELD_CURR < NO_FF_YIELD_MAX) {
    +                        NO_FF_YIELD_CURR += 1;
    +                        logger.info("DebugFlow yielding with no flow 
file");
    +                        context.yield();
    +                        break;
    +                    } else {
    +                        NO_FF_YIELD_CURR = 0;
    +                        curr_noff_resp = curr_noff_resp.getNextCycle();
    +                    }
    +                }
    +                if (curr_noff_resp == 
NoFlowfileResponse.NO_FF_SKIP_RESPONSE) {
    +                    if (NO_FF_SKIP_CURR < NO_FF_SKIP_MAX) {
    +                        NO_FF_SKIP_CURR += 1;
    +                        logger.info("DebugFlow skipping with no flow 
file");
    +                        return;
    +                    } else {
    +                        NO_FF_SKIP_CURR = 0;
    +                        curr_noff_resp = curr_noff_resp.getNextCycle();
    +                    }
    +                }
    +                return;
    +            } else {
    +                if (curr_ff_resp == FlowfileResponse.FF_SUCCESS_RESPONSE) {
    +                    if (FF_SUCCESS_CURR < FF_SUCCESS_MAX) {
    +                        FF_SUCCESS_CURR += 1;
    +                        logger.info("DebugFlow transferring to success 
file={} UUID={}",
    +                                new 
Object[]{ff.getAttribute(CoreAttributes.FILENAME.key()),
    +                                        
ff.getAttribute(CoreAttributes.UUID.key())});
    +                        session.transfer(ff, REL_SUCCESS);
    +                        session.commit();
    +                        break;
    +                    } else {
    +                        FF_SUCCESS_CURR = 0;
    +                        curr_ff_resp = curr_ff_resp.getNextCycle();
    +                    }
    +                }
    +                if (curr_ff_resp == FlowfileResponse.FF_FAILURE_RESPONSE) {
    +                    if (FF_FAILURE_CURR < FF_FAILURE_MAX) {
    +                        FF_FAILURE_CURR += 1;
    +                        logger.info("DebugFlow transferring to failure 
file={} UUID={}",
    +                                new 
Object[]{ff.getAttribute(CoreAttributes.FILENAME.key()),
    +                                        
ff.getAttribute(CoreAttributes.UUID.key())});
    +                        session.transfer(ff, REL_FAILURE);
    +                        session.commit();
    +                        break;
    +                    } else {
    +                        FF_FAILURE_CURR = 0;
    +                        curr_ff_resp = curr_ff_resp.getNextCycle();
    +                    }
    +                }
    +                if (curr_ff_resp == FlowfileResponse.FF_ROLLBACK_RESPONSE) 
{
    +                    if (FF_ROLLBACK_CURR < FF_ROLLBACK_MAX) {
    +                        FF_ROLLBACK_CURR += 1;
    +                        logger.info("DebugFlow rolling back (no penalty) 
file={} UUID={}",
    +                                new 
Object[]{ff.getAttribute(CoreAttributes.FILENAME.key()),
    +                                        
ff.getAttribute(CoreAttributes.UUID.key())});
    +                        session.rollback();
    +                        session.commit();
    +                        break;
    +                    } else {
    +                        FF_ROLLBACK_CURR = 0;
    +                        curr_ff_resp = curr_ff_resp.getNextCycle();
    +                    }
    +                }
    +                if (curr_ff_resp == FlowfileResponse.FF_YIELD_RESPONSE) {
    +                    if (FF_YIELD_CURR < FF_YIELD_MAX) {
    +                        FF_YIELD_CURR += 1;
    +                        logger.info("DebugFlow yielding file={} UUID={}",
    +                                new 
Object[]{ff.getAttribute(CoreAttributes.FILENAME.key()),
    +                                        
ff.getAttribute(CoreAttributes.UUID.key())});
    +                        session.rollback();
    +                        context.yield();
    +                        return;
    +                    } else {
    +                        FF_YIELD_CURR = 0;
    +                        curr_ff_resp = curr_ff_resp.getNextCycle();
    +                    }
    +                }
    +                if (curr_ff_resp == FlowfileResponse.FF_PENALTY_RESPONSE) {
    +                    if (FF_PENALTY_CURR < FF_PENALTY_MAX) {
    +                        FF_PENALTY_CURR += 1;
    +                        logger.info("DebugFlow rolling back (with penalty) 
file={} UUID={}",
    +                                new 
Object[]{ff.getAttribute(CoreAttributes.FILENAME.key()),
    +                                        
ff.getAttribute(CoreAttributes.UUID.key())});
    +                        session.rollback(true);
    +                        session.commit();
    +                        break;
    +                    } else {
    +                        FF_PENALTY_CURR = 0;
    +                        curr_ff_resp = curr_ff_resp.getNextCycle();
    +                    }
    +                }
    +                if (curr_ff_resp == 
FlowfileResponse.FF_EXCEPTION_RESPONSE) {
    +                    if (FF_EXCEPTION_CURR < FF_EXCEPTION_MAX) {
    +                        FF_EXCEPTION_CURR += 1;
    +                        logger.info("DebugFlow throwing NPE file={} 
UUID={}",
    +                                new 
Object[]{ff.getAttribute(CoreAttributes.FILENAME.key()),
    +                                        
ff.getAttribute(CoreAttributes.UUID.key())});
    +                        throw new NullPointerException("forced by " + 
this.getClass().getName());
    --- End diff --
    
    Why an NPE here? Would being allowed to specify the class of a Throwable in 
the configuration to throw here be overkill?


> Create FlowDebugger processor
> -----------------------------
>
>                 Key: NIFI-1829
>                 URL: https://issues.apache.org/jira/browse/NIFI-1829
>             Project: Apache NiFi
>          Issue Type: Improvement
>            Reporter: Joe Skora
>            Assignee: Joe Skora
>            Priority: Minor
>             Fix For: 1.0.0, 0.7.0
>
>
> The FlowDebugger processor allows a variety of Processor responses and 
> failures to be simulated for testing, debugging, and troubleshooting the 
> framework.
> Responses it can produce on receipt of a flowfile include Transfer to 
> Success, Transfer to Failure, Rollback without Penalty, Rollback and Yield, 
> Rollback with Penalty, and Throw an Exception.  The properties indicate how 
> many times that response should be thrown, for example if configured with 
> Success=10 and Failure=40, it will transfer the first 10 flowfiles to 
> Success, transfer the next 40 to Failure, and then repeat.
> Similarly, responses it can produce when triggered without a flowfile include 
> Throw an Exception, Yield, and Return (do nothing).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to