[ https://issues.apache.org/jira/browse/NIFI-3414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15962381#comment-15962381 ]
ASF GitHub Bot commented on NIFI-3414: -------------------------------------- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/1496#discussion_r110564978 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EnforceOrder.java --- @@ -0,0 +1,538 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.commons.lang3.StringUtils.isBlank; + +@EventDriven +@Tags({"sort", "order"}) +@InputRequirement(Requirement.INPUT_REQUIRED) +@TriggerSerially +@CapabilityDescription("Enforces expected ordering of FlowFiles those belong to the same data group. " + + " Although PriorityAttributePrioritizer can be used on a connection to ensure that flow files going through that connection are in priority order," + + " depending on error-handling, branching, and other flow designs, it is possible for FlowFiles to get out-of-order." + + " EnforceOrder can be used to enforce original ordering for those FlowFiles." + + " [IMPORTANT] In order to take effect of EnforceOrder, FirstInFirstOutPrioritizer should be used at EVERY downstream relationship" + + " UNTIL the order of FlowFiles physically get FIXED by operation such as MergeContent or being stored to the final destination.") +@Stateful(scopes = Scope.LOCAL, description = "EnforceOrder uses following states per ordering group:" + + " '<groupId>.target' is a order number which is being waited to arrive next." + + " When a FlowFile with a matching order arrives, or a FlowFile overtakes the FlowFile being waited for because of wait timeout," + + " target order will be updated to (FlowFile.order + 1)." + + " '<groupId>.max is the maximum order number for a group." + + " '<groupId>.updatedAt' is a timestamp when the order of a group was updated last time." + + " These managed states will be removed automatically once a group is determined as inactive, see 'Inactive Timeout' for detail.") +@WritesAttributes({ + @WritesAttribute(attribute = EnforceOrder.ATTR_STARTED_AT, + description = "All FlowFiles going through this processor will have this attribute. This value is used to determine wait timeout."), + @WritesAttribute(attribute = EnforceOrder.ATTR_RESULT, + description = "All FlowFiles going through this processor will have this attribute denoting which relationship it was routed to."), + @WritesAttribute(attribute = EnforceOrder.ATTR_DETAIL, + description = "FlowFiles routed to 'failure' or 'skipped' relationship will have this attribute describing details."), + @WritesAttribute(attribute = EnforceOrder.ATTR_EXPECTED_ORDER, + description = "FlowFiles routed to 'wait' or 'skipped' relationship will have this attribute denoting expected order when the FlowFile was processed.") +}) +public class EnforceOrder extends AbstractProcessor { + + public static final String ATTR_STARTED_AT = "EnforceOrder.startedAt"; + public static final String ATTR_EXPECTED_ORDER = "EnforceOrder.expectedOrder"; + public static final String ATTR_RESULT = "EnforceOrder.result"; + public static final String ATTR_DETAIL = "EnforceOrder.detail"; + private static final Function<String, String> STATE_TARGET_ORDER = groupId -> groupId + ".target"; + private static final String STATE_SUFFIX_UPDATED_AT = ".updatedAt"; + private static final Function<String, String> STATE_UPDATED_AT = groupId -> groupId + STATE_SUFFIX_UPDATED_AT; + private static final Function<String, String> STATE_MAX_ORDER = groupId -> groupId + ".max"; + + public static final PropertyDescriptor GROUP_IDENTIFIER = new PropertyDescriptor.Builder() + .name("group-id") + .displayName("Group Identifier") + .description("EnforceOrder is capable of multiple ordering groups." + + " 'Group Identifier' is used to determine which group a FlowFile belongs to." + + " This property will be evaluated with each incoming FlowFile." + + " If evaluated result is empty, the FlowFile will be routed to failure.") + .required(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(true) + .defaultValue("${filename}") + .build(); + + public static final PropertyDescriptor ORDER_ATTRIBUTE = new PropertyDescriptor.Builder() + .name("order-attribute") + .displayName("Order Attribute") + .description("A name of FlowFile attribute whose value will be used to enforce order of FlowFiles within a group." + + " If a FlowFile does not have this attribute, or its value is not an integer, the FlowFile will be routed to failure.") + .required(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(false) + .build(); + + public static final PropertyDescriptor INITIAL_ORDER = new PropertyDescriptor.Builder() + .name("initial-order") + .displayName("Initial Order") + .description("When the first FlowFile of a group arrives, initial target order will be computed and stored in the managed state." + + " After that, target order will start being tracked by EnforceOrder and stored in the state management store." + + " If Expression Language is used but evaluated result was not an integer, then the FlowFile will be routed to failure," + + " and initial order will be left unknown until consecutive FlowFiles provide a valid initial order.") + .required(true) + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(true) + .defaultValue("1") + .build(); + + public static final PropertyDescriptor MAX_ORDER = new PropertyDescriptor.Builder() + .name("maximum-order") + .displayName("Maximum Order") + .description("If specified, any FlowFiles that has larger order will be routed to failure." + + " This property is computed only once for a given group." + + " After a maximum order is computed, it will be persisted in the state management store and used for other FlowFiles belonging to the same group." + + " If Expression Language is used but evaluated result was not an integer, then the FlowFile will be routed to failure," + + " and maximum order will be left unknown until consecutive FlowFiles provide a valid maximum order.") + .required(false) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor WAIT_TIMEOUT = new PropertyDescriptor.Builder() + .name("wait-timeout") + .displayName("Wait Timeout") + .description("Indicates the duration after which waiting FlowFiles will be routed to the 'overtook' relationship.") + .required(true) + .defaultValue("10 min") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(false) + .build(); + + public static final PropertyDescriptor INACTIVE_TIMEOUT = new PropertyDescriptor.Builder() + .name("inactive-timeout") + .displayName("Inactive Timeout") + .description("Indicates the duration after which state for an inactive group will be cleared from managed state." + + " Group is determined as inactive if any new incoming FlowFile has not seen for a group for specified duration." + + " Inactive Timeout must be longer than Wait Timeout." + + " If a FlowFile arrives late after its group is already cleared, it will be treated as a brand new group," + + " but will never match the order since expected preceding FlowFiles are already gone." + + " The FlowFile will eventually timeout for waiting and routed to 'overtook'." + + " To avoid this, group states should be kept long enough, however, shorter duration would be helpful for reusing the same group identifier again.") + .required(true) + .defaultValue("30 min") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(false) + .build(); + + public static final PropertyDescriptor BATCH_COUNT = new PropertyDescriptor.Builder() + .name("batch-count") + .displayName("Batch Count") + .description("The maximum number of FlowFiles that EnforceOrder can process at an execution.") + .required(true) + .defaultValue("1000") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(false) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("A FlowFile with a matching order number will be routed to this relationship.") + .build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("A FlowFiles which does not have required attributes, or fails to compute those will be routed to this relationship") + .build(); + + public static final Relationship REL_WAIT = new Relationship.Builder() + .name("wait") + .description("A FlowFile with non matching order will be routed to this relationship") + .build(); + + public static final Relationship REL_OVERTOOK = new Relationship.Builder() + .name("overtook") + .description("A FlowFile that waited for preceding FlowFiles longer than Wait Timeout and overtook those FlowFiles, will be routed to this relationship.") + .build(); + + public static final Relationship REL_SKIPPED = new Relationship.Builder() + .name("skipped") + .description("A FlowFile that has an order younger than current, which means arrived too late and skipped, will be routed to this relationship.") + .build(); + + private final Set<Relationship> relationships; + + public EnforceOrder() { + final Set<Relationship> rels = new HashSet<>(); + rels.add(REL_SUCCESS); + rels.add(REL_WAIT); + rels.add(REL_OVERTOOK); + rels.add(REL_FAILURE); + rels.add(REL_SKIPPED); + relationships = Collections.unmodifiableSet(rels); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> descriptors = new ArrayList<>(); + descriptors.add(GROUP_IDENTIFIER); + descriptors.add(ORDER_ATTRIBUTE); + descriptors.add(INITIAL_ORDER); + descriptors.add(MAX_ORDER); + descriptors.add(BATCH_COUNT); + descriptors.add(WAIT_TIMEOUT); + descriptors.add(INACTIVE_TIMEOUT); + return descriptors; + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + + @Override + protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { + final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext)); + + final Long waitTimeoutMillis = validationContext.getProperty(WAIT_TIMEOUT).asTimePeriod(TimeUnit.MICROSECONDS); + final Long inactiveTimeoutMillis = validationContext.getProperty(INACTIVE_TIMEOUT).asTimePeriod(TimeUnit.MICROSECONDS); + + if (waitTimeoutMillis >= inactiveTimeoutMillis) { + results.add(new ValidationResult.Builder().input(validationContext.getProperty(INACTIVE_TIMEOUT).getValue()) + .subject(INACTIVE_TIMEOUT.getDisplayName()) + .explanation(String.format("%s should be longer than %s", + INACTIVE_TIMEOUT.getDisplayName(), WAIT_TIMEOUT.getDisplayName())) + .valid(false) + .build()); + } + + return results; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + + final ComponentLog logger = getLogger(); + final Integer batchCount = context.getProperty(BATCH_COUNT).asInteger(); + + final StateMap stateMap; + try { + stateMap = context.getStateManager().getState(Scope.LOCAL); + } catch (final IOException e) { + logger.error("Failed to retrieve state from StateManager due to {}" + e, e); + context.yield(); + return; + } + + final OrderingContext oc = new OrderingContext(context, session); + + oc.groupStates.putAll(stateMap.toMap()); + + for (int i = 0; i < batchCount; i++) { + + oc.setFlowFile(session.get()); + if (oc.flowFile == null) { + break; + } + + if (!oc.computeGroupId() + || !oc.computeOrder() + || !oc.computeInitialOrder() + || !oc.computeMaxOrder()) { + continue; + } + + // At this point, the flow file is confirmed to be valid. + oc.markFlowFileValied(); + } + + oc.transferFlowFiles(); + + oc.cleanupInactiveStates(); + + try { + context.getStateManager().setState(oc.groupStates, Scope.LOCAL); + } catch (final IOException e) { + throw new RuntimeException("Failed to update state due to " + e + + ". Session will be rollback and processor will be yielded for a while.", e); + } + + } + + private class OrderingContext { + + private final ComponentLog logger = getLogger(); + private final ProcessSession processSession; + private final ProcessContext processContext; + + // Following properties are static global setting for all groups. + private final String orderAttribute; + private final Long waitTimeoutMillis; + private final Function<FlowFile, Integer> getOrder; + + private final Map<String, String> groupStates = new HashMap<>(); + private final long now = System.currentTimeMillis(); + + // Following properties are computed per flow file. + private final PropertyValue groupIdentifierProperty ; + + // Followings are per group objects. + private final PropertyValue initOrderProperty; + private final PropertyValue maxOrderProperty; + private final Map<String, List<FlowFile>> flowFileGroups = new TreeMap<>(); + + // Current variables within incoming FlowFiles loop. + private FlowFile flowFile; + private String groupId; + private Integer order; + + private OrderingContext(final ProcessContext processContext, final ProcessSession processSession) { + this.processContext = processContext; + this.processSession = processSession; + + orderAttribute = processContext.getProperty(ORDER_ATTRIBUTE).getValue(); + waitTimeoutMillis = processContext.getProperty(WAIT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS); + getOrder = flowFile -> Integer.parseInt(flowFile.getAttribute(orderAttribute)); + + + groupIdentifierProperty = processContext.getProperty(GROUP_IDENTIFIER); + + initOrderProperty = processContext.getProperty(INITIAL_ORDER); + maxOrderProperty = processContext.getProperty(MAX_ORDER); + } + + private void setFlowFile(final FlowFile flowFile) { + this.flowFile = flowFile; + this.groupId = null; + this.order = null; + } + + private boolean computeGroupId() { + groupId = groupIdentifierProperty.evaluateAttributeExpressions(flowFile).getValue(); + if (isBlank(groupId)) { + transferToFailure(flowFile, "Failed to get Group Identifier."); + return false; + } + return true; + } + + private boolean computeOrder() { + try { + order = getOrder.apply(flowFile); + } catch (final NumberFormatException e) { + transferToFailure(flowFile, "Failed to parse order attribute due to " + e, e); + return false; + } + return true; + } + + private boolean computeMaxOrder() { + if (maxOrderProperty.isSet()) { + // Compute maxOrder for this group if it's not there yet. + final String maxOrderStr = groupStates.computeIfAbsent(STATE_MAX_ORDER.apply(groupId), + k -> maxOrderProperty.evaluateAttributeExpressions(flowFile).getValue()); + if (isBlank(maxOrderStr)) { + transferToFailure(flowFile, String.format("%s was specified but result was empty.", MAX_ORDER.getDisplayName())); + return false; + } + + final Integer maxOrder; + try { + maxOrder = Integer.parseInt(maxOrderStr); + } catch (final NumberFormatException e) { + final String msg = String.format("Failed to get Maximum Order for group [%s] due to %s", groupId, e); + transferToFailure(flowFile, msg, e); + return false; + } + + // Check max order. + if (order > maxOrder) { + final String msg = String.format("Order (%d) is greater than the Maximum Order (%d) for Group [%s]", order, maxOrder, groupId); + transferToFailure(flowFile, msg); + return false; + } + } + return true; + } + + private boolean computeInitialOrder() { + // Compute initial order. Use asInteger() to check if it's a valid integer. + final String stateKeyOrder = STATE_TARGET_ORDER.apply(groupId); + try { + final AtomicReference<String> computedInitOrder = new AtomicReference<>(); + groupStates.computeIfAbsent(stateKeyOrder, k -> { + final String initOrderStr = initOrderProperty.evaluateAttributeExpressions(flowFile).getValue(); + final int initOrder = Integer.parseInt(initOrderStr); + computedInitOrder.set(initOrderStr); + return initOrderStr; + }); + // If these map modification is in the computeIfAbsent function, it causes this issue. + // JDK-8071667 : HashMap.computeIfAbsent() adds entry that HashMap.get() does not find. + // http://bugs.java.com/bugdatabase/view_bug.do?bug_id=8071667 + if (!isBlank(computedInitOrder.get())) { + groupStates.put(STATE_UPDATED_AT.apply(groupId), String.valueOf(now)); + } + + } catch (final NumberFormatException e) { + final String msg = String.format("Failed to get Initial Order for Group [%s] due to %s", groupId, e); + transferToFailure(flowFile, msg, e); + return false; + } + return true; + } + + private void markFlowFileValied() { --- End diff -- Thanks, fixed it. > Implement an EnforceOrder processor > ----------------------------------- > > Key: NIFI-3414 > URL: https://issues.apache.org/jira/browse/NIFI-3414 > Project: Apache NiFi > Issue Type: New Feature > Reporter: Matt Burgess > Assignee: Koji Kawamura > > For some flows, it is imperative that the flow files are processed in a > certain order. The PriorityAttributePrioritizer can be used on a connection > to ensure that flow files going through that connection are in priority > order, but depending on error-handling, branching, and other flow designs, it > is possible for flow files to get out-of-order. > I propose an EnforceOrder processor, which would be single-threaded and have > (at a minimum) the following properties: > 1) Order Attribute: This would be the name of a flow file attribute from > which the current value will be retrieved. > 2) Initial Value: This property specifies an initial value for the order. The > processor is stateful, however, so this property is only used when there is > no entry in the state map for current value. > The processor would store the Initial Value into the state map (if no state > map entry exists), then for each incoming flow file, it checks the value in > the Order Attribute against the current value. If the attribute value > matches the current value, the flow file is transferred to the "success" > relationship, and the current value is incremented in the state map. If the > attribute value does not match the current value, the session will be rolled > back. > Using this processor, along with a PriorityAttributePrioritizer on the > incoming connection, will allow for out-of-order flow files to have a sort of > "barrier", thereby guaranteeing that flow files transferred to the "success" > relationship are in the specified order. -- This message was sent by Atlassian JIRA (v6.3.15#6346)