[ https://issues.apache.org/jira/browse/NIFI-1829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15308223#comment-15308223 ]
ASF GitHub Bot commented on NIFI-1829: -------------------------------------- Github user trkurc commented on a diff in the pull request: https://github.com/apache/nifi/pull/458#discussion_r65231261 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java --- @@ -0,0 +1,414 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +@EventDriven() +@Tags({"test", "debug", "processor", "utility", "flow", "flowfile"}) +@CapabilityDescription("This processor aids in the testing and debugging of the flowfile framework by allowing " + + "a developer or flow manager to force various responses to a flowfile. In response to a receiving a flowfile " + + "it can route to the success or failure relationships, rollback, rollback and yield, rollback with penalty, " + + "or throw an exception. In addition, if using a timer based scheduling strategy, upon being triggered without " + + "a flowfile, it can be configured to throw an exception, when triggered without a flowfile.\n" + + "\n" + + "The 'iterations' properties, such as \"Success iterations\", configure how many times each response should occur " + + "in succession before moving on to the next response within the group of flowfile responses or no flowfile" + + "responses.\n" + + "\n" + + "The order of responses when a flow file is received are:" + + " 1. transfer flowfile to success relationship.\n" + + " 2. transfer flowfile to failure relationship.\n" + + " 3. rollback the flowfile without penalty.\n" + + " 4. rollback the flowfile and yield the context.\n" + + " 5. rollback the flowfile with penalty.\n" + + " 6. throw an exception.\n" + + "\n" + + "The order of responses when no flow file is received are:" + + " 1. yield the context.\n" + + " 2. throw an exception.\n" + + " 3. do nothing and return.\n" + + "\n" + + "By default, the processor is configured to perform each response one time. After processing the list of " + + "responses it will resume from the top of the list.\n" + + "\n" + + "To suppress any response, it's value can be set to zero (0) and no responses of that type will occur during " + + "processing.") +public class DebugFlow extends AbstractProcessor { + + private Set<Relationship> relationships = null; + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("Flowfiles processed successfully.") + .build(); + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("Flowfiles that failed to process.") + .build(); + + private List<PropertyDescriptor> propertyDescriptors = null; + + static final PropertyDescriptor FF_SUCCESS_ITERATIONS = new PropertyDescriptor.Builder() + .name("Success Iterations") + .description("Number of flowfiles to forward to success relationship.") + .required(true) + .defaultValue("1") + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + static final PropertyDescriptor FF_FAILURE_ITERATIONS = new PropertyDescriptor.Builder() + .name("Failure Iterations") + .description("Number of flowfiles to forward to failure relationship.") + .required(true) + .defaultValue("0") + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + static final PropertyDescriptor FF_ROLLBACK_ITERATIONS = new PropertyDescriptor.Builder() + .name("Rollback Iterations") + .description("Number of flowfiles to roll back (without penalty).") + .required(true) + .defaultValue("0") + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + static final PropertyDescriptor FF_ROLLBACK_YIELD_ITERATIONS = new PropertyDescriptor.Builder() + .name("Rollback Yield Iterations") + .description("Number of flowfiles to roll back and yield.") + .required(true) + .defaultValue("0") + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + static final PropertyDescriptor FF_ROLLBACK_PENALTY_ITERATIONS = new PropertyDescriptor.Builder() + .name("Rollback Penalty Iterations") + .description("Number of flowfiles to roll back with penalty.") + .required(true) + .defaultValue("0") + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + static final PropertyDescriptor FF_EXCEPTION_ITERATIONS = new PropertyDescriptor.Builder() + .name("Exception Iterations") + .description("Number of flowfiles to throw NPE exception.") + .required(true) + .defaultValue("0") + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + + static final PropertyDescriptor NO_FF_EXCEPTION_ITERATIONS = new PropertyDescriptor.Builder() + .name("No Flowfile Exception Iterations") + .description("Number of times to throw NPE exception if no flowfile.") + .required(true) + .defaultValue("0") + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + static final PropertyDescriptor NO_FF_YIELD_ITERATIONS = new PropertyDescriptor.Builder() + .name("No Flowfile Yield Iterations") + .description("Number of times to yield if no flowfile.") + .required(true) + .defaultValue("0") + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + static final PropertyDescriptor NO_FF_SKIP_ITERATIONS = new PropertyDescriptor.Builder() + .name("No Flowfile Skip Iterations") + .description("Number of times to skip onTrigger if no flowfile.") + .required(true) + .defaultValue("1") + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + + Integer FF_SUCCESS_MAX = 0; + private Integer FF_FAILURE_MAX = 0; + private Integer FF_ROLLBACK_MAX = 0; + private Integer FF_YIELD_MAX = 0; + private Integer FF_PENALTY_MAX = 0; + private Integer FF_EXCEPTION_MAX = 0; + + private Integer NO_FF_EXCEPTION_MAX = 0; + private Integer NO_FF_YIELD_MAX = 0; + private Integer NO_FF_SKIP_MAX = 0; + + private Integer FF_SUCCESS_CURR = 0; + private Integer FF_FAILURE_CURR = 0; + private Integer FF_ROLLBACK_CURR = 0; + private Integer FF_YIELD_CURR = 0; + private Integer FF_PENALTY_CURR = 0; + private Integer FF_EXCEPTION_CURR = 0; + + private Integer NO_FF_EXCEPTION_CURR = 0; + private Integer NO_FF_YIELD_CURR = 0; + private Integer NO_FF_SKIP_CURR = 0; + + private FlowfileResponse curr_ff_resp; + private NoFlowfileResponse curr_noff_resp; + + private enum FlowfileResponse { + FF_SUCCESS_RESPONSE(0, 1), + FF_FAILURE_RESPONSE(1, 2), + FF_ROLLBACK_RESPONSE(2, 3), + FF_YIELD_RESPONSE(3, 4), + FF_PENALTY_RESPONSE(4, 5), + FF_EXCEPTION_RESPONSE(5, 0); + + private Integer id; + private Integer nextId; + private FlowfileResponse next; + + private static final Map<Integer, FlowfileResponse> byId = new HashMap<>(); + static { + for (FlowfileResponse rc : FlowfileResponse.values()) { + if (byId.put(rc.id, rc) != null) { + throw new IllegalArgumentException("duplicate id: " + rc.id); + } + } + for (FlowfileResponse rc : FlowfileResponse.values()) { + rc.next = byId.get(rc.nextId); + } + } + FlowfileResponse(Integer pId, Integer pNext) { + id = pId; + nextId = pNext; + } + FlowfileResponse getNextCycle() { + return next; + } + } + + private enum NoFlowfileResponse { + NO_FF_EXCEPTION_RESPONSE(0, 1), + NO_FF_YIELD_RESPONSE(1, 2), + NO_FF_SKIP_RESPONSE(2, 0); + + private Integer id; + private Integer nextId; + private NoFlowfileResponse next; + + private static final Map<Integer, NoFlowfileResponse> byId = new HashMap<>(); + static { + for (NoFlowfileResponse rc : NoFlowfileResponse.values()) { + if (byId.put(rc.id, rc) != null) { + throw new IllegalArgumentException("duplicate id: " + rc.id); + } + } + for (NoFlowfileResponse rc : NoFlowfileResponse.values()) { + rc.next = byId.get(rc.nextId); + } + } + NoFlowfileResponse(Integer pId, Integer pNext) { + id = pId; + nextId = pNext; + } + NoFlowfileResponse getNextCycle() { + return next; + } + } + + @Override + public Set<Relationship> getRelationships() { + if (relationships == null) { + HashSet<Relationship> relSet = new HashSet<>(); + relSet.add(REL_SUCCESS); + relSet.add(REL_FAILURE); + relationships = Collections.unmodifiableSet(relSet); + } + return relationships; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + if (propertyDescriptors == null) { + ArrayList<PropertyDescriptor> propList = new ArrayList<>(); + propList.add(FF_SUCCESS_ITERATIONS); + propList.add(FF_FAILURE_ITERATIONS); + propList.add(FF_ROLLBACK_ITERATIONS); + propList.add(FF_ROLLBACK_YIELD_ITERATIONS); + propList.add(FF_ROLLBACK_PENALTY_ITERATIONS); + propList.add(FF_EXCEPTION_ITERATIONS); + propList.add(NO_FF_EXCEPTION_ITERATIONS); + propList.add(NO_FF_YIELD_ITERATIONS); + propList.add(NO_FF_SKIP_ITERATIONS); + propertyDescriptors = Collections.unmodifiableList(propList); + } + return propertyDescriptors; + } + + @SuppressWarnings("unused") + @OnScheduled + public void onScheduled(ProcessContext context) { + FF_SUCCESS_MAX = context.getProperty(FF_SUCCESS_ITERATIONS).asInteger(); + FF_FAILURE_MAX = context.getProperty(FF_FAILURE_ITERATIONS).asInteger(); + FF_YIELD_MAX = context.getProperty(FF_ROLLBACK_YIELD_ITERATIONS).asInteger(); + FF_ROLLBACK_MAX = context.getProperty(FF_ROLLBACK_ITERATIONS).asInteger(); + FF_PENALTY_MAX = context.getProperty(FF_ROLLBACK_PENALTY_ITERATIONS).asInteger(); + FF_EXCEPTION_MAX = context.getProperty(FF_EXCEPTION_ITERATIONS).asInteger(); + NO_FF_EXCEPTION_MAX = context.getProperty(NO_FF_EXCEPTION_ITERATIONS).asInteger(); + NO_FF_YIELD_MAX = context.getProperty(NO_FF_YIELD_ITERATIONS).asInteger(); + NO_FF_SKIP_MAX = context.getProperty(NO_FF_SKIP_ITERATIONS).asInteger(); + curr_ff_resp = FlowfileResponse.FF_SUCCESS_RESPONSE; + curr_noff_resp = NoFlowfileResponse.NO_FF_EXCEPTION_RESPONSE; + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + final ProcessorLog logger = getLogger(); + + FlowFile ff = session.get(); + + // Make up to 2 passes to allow rollover from last cycle to first. + // (This could be "while(true)" since responses should break out if selected, but this + // prevents endless loops in the event of unexpected errors or future changes.) + int pass = 2; + while (pass > 0) { + pass -= 1; + if (ff == null) { + if (curr_noff_resp == NoFlowfileResponse.NO_FF_EXCEPTION_RESPONSE) { + if (NO_FF_EXCEPTION_CURR < NO_FF_EXCEPTION_MAX) { + NO_FF_EXCEPTION_CURR += 1; + logger.info("DebugFlow throwing NPE with no flow file"); + throw new NullPointerException("forced by " + this.getClass().getName()); + } else { + NO_FF_EXCEPTION_CURR = 0; + curr_noff_resp = curr_noff_resp.getNextCycle(); + } + } + if (curr_noff_resp == NoFlowfileResponse.NO_FF_YIELD_RESPONSE) { + if (NO_FF_YIELD_CURR < NO_FF_YIELD_MAX) { + NO_FF_YIELD_CURR += 1; + logger.info("DebugFlow yielding with no flow file"); + context.yield(); + break; + } else { + NO_FF_YIELD_CURR = 0; + curr_noff_resp = curr_noff_resp.getNextCycle(); + } + } + if (curr_noff_resp == NoFlowfileResponse.NO_FF_SKIP_RESPONSE) { + if (NO_FF_SKIP_CURR < NO_FF_SKIP_MAX) { + NO_FF_SKIP_CURR += 1; + logger.info("DebugFlow skipping with no flow file"); + return; + } else { + NO_FF_SKIP_CURR = 0; + curr_noff_resp = curr_noff_resp.getNextCycle(); + } + } + return; + } else { + if (curr_ff_resp == FlowfileResponse.FF_SUCCESS_RESPONSE) { + if (FF_SUCCESS_CURR < FF_SUCCESS_MAX) { + FF_SUCCESS_CURR += 1; + logger.info("DebugFlow transferring to success file={} UUID={}", + new Object[]{ff.getAttribute(CoreAttributes.FILENAME.key()), + ff.getAttribute(CoreAttributes.UUID.key())}); + session.transfer(ff, REL_SUCCESS); + session.commit(); + break; + } else { + FF_SUCCESS_CURR = 0; + curr_ff_resp = curr_ff_resp.getNextCycle(); + } + } + if (curr_ff_resp == FlowfileResponse.FF_FAILURE_RESPONSE) { + if (FF_FAILURE_CURR < FF_FAILURE_MAX) { + FF_FAILURE_CURR += 1; + logger.info("DebugFlow transferring to failure file={} UUID={}", + new Object[]{ff.getAttribute(CoreAttributes.FILENAME.key()), + ff.getAttribute(CoreAttributes.UUID.key())}); + session.transfer(ff, REL_FAILURE); + session.commit(); + break; + } else { + FF_FAILURE_CURR = 0; + curr_ff_resp = curr_ff_resp.getNextCycle(); + } + } + if (curr_ff_resp == FlowfileResponse.FF_ROLLBACK_RESPONSE) { + if (FF_ROLLBACK_CURR < FF_ROLLBACK_MAX) { + FF_ROLLBACK_CURR += 1; + logger.info("DebugFlow rolling back (no penalty) file={} UUID={}", + new Object[]{ff.getAttribute(CoreAttributes.FILENAME.key()), + ff.getAttribute(CoreAttributes.UUID.key())}); + session.rollback(); + session.commit(); + break; + } else { + FF_ROLLBACK_CURR = 0; + curr_ff_resp = curr_ff_resp.getNextCycle(); + } + } + if (curr_ff_resp == FlowfileResponse.FF_YIELD_RESPONSE) { + if (FF_YIELD_CURR < FF_YIELD_MAX) { + FF_YIELD_CURR += 1; + logger.info("DebugFlow yielding file={} UUID={}", + new Object[]{ff.getAttribute(CoreAttributes.FILENAME.key()), + ff.getAttribute(CoreAttributes.UUID.key())}); + session.rollback(); + context.yield(); + return; + } else { + FF_YIELD_CURR = 0; + curr_ff_resp = curr_ff_resp.getNextCycle(); + } + } + if (curr_ff_resp == FlowfileResponse.FF_PENALTY_RESPONSE) { + if (FF_PENALTY_CURR < FF_PENALTY_MAX) { + FF_PENALTY_CURR += 1; + logger.info("DebugFlow rolling back (with penalty) file={} UUID={}", + new Object[]{ff.getAttribute(CoreAttributes.FILENAME.key()), + ff.getAttribute(CoreAttributes.UUID.key())}); + session.rollback(true); + session.commit(); + break; + } else { + FF_PENALTY_CURR = 0; + curr_ff_resp = curr_ff_resp.getNextCycle(); + } + } + if (curr_ff_resp == FlowfileResponse.FF_EXCEPTION_RESPONSE) { + if (FF_EXCEPTION_CURR < FF_EXCEPTION_MAX) { + FF_EXCEPTION_CURR += 1; + logger.info("DebugFlow throwing NPE file={} UUID={}", + new Object[]{ff.getAttribute(CoreAttributes.FILENAME.key()), + ff.getAttribute(CoreAttributes.UUID.key())}); + throw new NullPointerException("forced by " + this.getClass().getName()); --- End diff -- Why an NPE here? Would being allowed to specify the class of a Throwable in the configuration to throw here be overkill? > Create FlowDebugger processor > ----------------------------- > > Key: NIFI-1829 > URL: https://issues.apache.org/jira/browse/NIFI-1829 > Project: Apache NiFi > Issue Type: Improvement > Reporter: Joe Skora > Assignee: Joe Skora > Priority: Minor > Fix For: 1.0.0, 0.7.0 > > > The FlowDebugger processor allows a variety of Processor responses and > failures to be simulated for testing, debugging, and troubleshooting the > framework. > Responses it can produce on receipt of a flowfile include Transfer to > Success, Transfer to Failure, Rollback without Penalty, Rollback and Yield, > Rollback with Penalty, and Throw an Exception. The properties indicate how > many times that response should be thrown, for example if configured with > Success=10 and Failure=40, it will transfer the first 10 flowfiles to > Success, transfer the next 40 to Failure, and then repeat. > Similarly, responses it can produce when triggered without a flowfile include > Throw an Exception, Yield, and Return (do nothing). -- This message was sent by Atlassian JIRA (v6.3.4#6332)