Github user patricker commented on a diff in the pull request: https://github.com/apache/nifi/pull/3078#discussion_r226012968 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PenalizeFlowFile.java --- @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags({"penalty", "penalize", "flowfile"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Penalizes a FlowFile.") +@WritesAttributes({ + @WritesAttribute(attribute = "penalization.count.{processor uuid}", description = "How many times this processor has penalized this FlowFile.") +}) + +public class PenalizeFlowFile extends AbstractProcessor { + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") + .description("Successfully penalized FlowFile").build(); + + private List<PropertyDescriptor> properties; + private Set<Relationship> relationships; + + @Override + protected void init(final ProcessorInitializationContext context) { + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + this.relationships = Collections.unmodifiableSet(relationships); + } + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + // Track how many times a FlowFile passes through this processor to better support the Retry use case + final String retryAttrName = "penalization.count." + this.getIdentifier(); + final String initialCount = flowFile.getAttribute(retryAttrName); + long cnt = 0; + if(initialCount != null) { + cnt = Long.parseLong(initialCount); + } + + cnt++; + + flowFile = session.putAttribute(flowFile, retryAttrName, Long.toString(cnt)); --- End diff -- I agree. I've updated the PR. All of those other features related to retry should be handled under NIFI-3792. It sounds like there is a processor setup, he's planning to PR it soon.
---