markap14 commented on a change in pull request #5593: URL: https://github.com/apache/nifi/pull/5593#discussion_r792878063
########## File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java ########## @@ -659,4 +670,56 @@ public void setVersionedComponentId(final String versionedComponentId) { } } } + + @Override + public int getRetryCount() { + return 0; + } + + @Override + public void setRetryCount(Integer retryCount) { + this.retryCount = retryCount; + } + + @Override + public Set<String> getRetriedRelationships() { + return Collections.EMPTY_SET; Review comment: Should use `Collections.emptySet()` ########## File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java ########## @@ -659,4 +670,56 @@ public void setVersionedComponentId(final String versionedComponentId) { } } } + + @Override + public int getRetryCount() { + return 0; + } + + @Override + public void setRetryCount(Integer retryCount) { + this.retryCount = retryCount; Review comment: There's no need to store this if we always return `0`. This can be a no-op ########## File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java ########## @@ -289,63 +292,112 @@ private void checkpoint(final boolean copyCollections) { // validate that all records have a transfer relationship for them and if so determine the destination node and clone as necessary final Map<Long, StandardRepositoryRecord> toAdd = new HashMap<>(); + final Map<String, StandardRepositoryRecord> uuidsToRecords = records.values() + .stream() + .collect(Collectors.toMap(record -> record.getCurrent().getAttribute(CoreAttributes.UUID.key()), Function.identity())); + for (final StandardRepositoryRecord record : records.values()) { if (record.isMarkedForDelete()) { continue; } + if (records.get(record.getCurrent().getId()) == null) { + continue; + } + final Relationship relationship = record.getTransferRelationship(); - final List<Connection> destinations = new ArrayList<>(context.getConnections(relationship)); - if (destinations.isEmpty() && relationship == Relationship.SELF) { - record.setDestination(record.getOriginalQueue()); - } else if (destinations.isEmpty()) { - record.markForDelete(); + final Connectable connectable = context.getConnectable(); + final FlowFileRecord currentFlowFile = record.getOriginal(); Review comment: Is the intent here to get the original version of the FlowFile or the current version? I'm confused by the variable name `currentFlowFile` when we're using `record.getOriginal()` - we should either rename variable to `originalFlowFile` if that's what we want or call `record.getCurrent()` if that's what we want. ########## File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java ########## @@ -301,13 +301,13 @@ private void checkpoint(final boolean copyCollections) { continue; } - final Relationship relationship = record.getTransferRelationship(); - ProcessorNode processorNode = null; - - if (context.getConnectable() instanceof ProcessorNode) { - processorNode = (ProcessorNode) context.getConnectable(); + if (records.get(record.getCurrent().getId()) == null) { + continue; } Review comment: Why was this check added? The Record came from the `this.records` Map - why are we checking that it exists in the map? ########## File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java ########## @@ -659,4 +670,56 @@ public void setVersionedComponentId(final String versionedComponentId) { } } } + + @Override + public int getRetryCount() { + return 0; + } + + @Override + public void setRetryCount(Integer retryCount) { + this.retryCount = retryCount; + } + + @Override + public Set<String> getRetriedRelationships() { + return Collections.EMPTY_SET; + } + + @Override + public void setRetriedRelationships(Set<String> retriedRelationships) { + this.retriedRelationships = (retriedRelationships == null) ? Collections.emptySet() : new HashSet<>(retriedRelationships); + } + + @Override + public boolean isRelationshipRetried(Relationship relationship) { + return false; + } + + @Override + public BackoffMechanism getBackoffMechanism() { + return BackoffMechanism.PENALIZE_FLOWFILE; + } + + @Override + public void setBackoffMechanism(BackoffMechanism backoffMechanism) { + this.backoffMechanism = (backoffMechanism == null) ? BackoffMechanism.PENALIZE_FLOWFILE : backoffMechanism; + } + + @Override + public String getMaxBackoffPeriod() { + return DEFAULT_MAX_BACKOFF_PERIOD; + } + + @Override + public void setMaxBackoffPeriod(String maxBackoffPeriod) { + if (maxBackoffPeriod == null) { Review comment: There's no need to store this if we always return the same value. This can be a no-op ########## File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java ########## @@ -289,63 +292,112 @@ private void checkpoint(final boolean copyCollections) { // validate that all records have a transfer relationship for them and if so determine the destination node and clone as necessary final Map<Long, StandardRepositoryRecord> toAdd = new HashMap<>(); + final Map<String, StandardRepositoryRecord> uuidsToRecords = records.values() + .stream() + .collect(Collectors.toMap(record -> record.getCurrent().getAttribute(CoreAttributes.UUID.key()), Function.identity())); + for (final StandardRepositoryRecord record : records.values()) { if (record.isMarkedForDelete()) { continue; } + if (records.get(record.getCurrent().getId()) == null) { + continue; + } + final Relationship relationship = record.getTransferRelationship(); - final List<Connection> destinations = new ArrayList<>(context.getConnections(relationship)); - if (destinations.isEmpty() && relationship == Relationship.SELF) { - record.setDestination(record.getOriginalQueue()); - } else if (destinations.isEmpty()) { - record.markForDelete(); + final Connectable connectable = context.getConnectable(); + final FlowFileRecord currentFlowFile = record.getOriginal(); + int retryCount = 0; + if (currentFlowFile != null) { + String retryCountString = currentFlowFile.getAttribute("retryCount"); + if (retryCountString != null) { + retryCount = Integer.parseInt(retryCountString); + } + } + + if (isRetryNeeded(connectable, record, currentFlowFile, retryCount, uuidsToRecords)) { - if (autoTerminatedEvents == null) { - autoTerminatedEvents = new ArrayList<>(); + final boolean isProcessorYielding = connectable.getBackoffMechanism().equals(BackoffMechanism.YIELD_PROCESSOR); + final long maxBackoffPeriod = Math.round(FormatUtils.getPreciseTimeDuration(connectable.getMaxBackoffPeriod(), TimeUnit.MILLISECONDS)); + final long penalizationTime; + + if (isProcessorYielding) { + penalizationTime = connectable.getYieldPeriod(TimeUnit.MILLISECONDS); + } else { + penalizationTime = connectable.getPenalizationPeriod(TimeUnit.MILLISECONDS); } - final ProvenanceEventRecord dropEvent; - try { - dropEvent = provenanceReporter.generateDropEvent(record.getCurrent(), "Auto-Terminated by " + relationship.getName() + " Relationship"); - autoTerminatedEvents.add(dropEvent); - } catch (final Exception e) { - LOG.warn("Unable to generate Provenance Event for {} on behalf of {} due to {}", record.getCurrent(), connectableDescription, e); - if (LOG.isDebugEnabled()) { - LOG.warn("", e); - } + final long backoffTime = calculateBackoffTime(retryCount, maxBackoffPeriod, penalizationTime); + retryCount++; + + adjustConnectableStatsForRetry(record, relationship, uuidsToRecords); + final FlowFileRecord updatedRecord = resetFlowFileRecordAndUpdateItForRetry(record, uuidsToRecords, retryCount, currentFlowFile); + clearProvenanceEventsForRetry(currentFlowFile); + + if (isProcessorYielding) { + connectable.yield(backoffTime, TimeUnit.MILLISECONDS); + } else { + penalize(updatedRecord, backoffTime, TimeUnit.MILLISECONDS); } + } else { - final Connection finalDestination = destinations.remove(destinations.size() - 1); // remove last element - record.setDestination(finalDestination.getFlowFileQueue()); - incrementConnectionInputCounts(finalDestination, record); - - for (final Connection destination : destinations) { // iterate over remaining destinations and "clone" as needed - incrementConnectionInputCounts(destination, record); - final FlowFileRecord currRec = record.getCurrent(); - final StandardFlowFileRecord.Builder builder = new StandardFlowFileRecord.Builder().fromFlowFile(currRec); - builder.id(context.getNextFlowFileSequence()); - - final String newUuid = UUID.randomUUID().toString(); - builder.addAttribute(CoreAttributes.UUID.key(), newUuid); - - final FlowFileRecord clone = builder.build(); - final StandardRepositoryRecord newRecord = new StandardRepositoryRecord(destination.getFlowFileQueue()); - provenanceReporter.clone(currRec, clone, false); - - final ContentClaim claim = clone.getContentClaim(); - if (claim != null) { - context.getContentRepository().incrementClaimaintCount(claim); + final List<Connection> destinations = new ArrayList<>(context.getConnections(relationship)); + + if (destinations.isEmpty() && relationship == Relationship.SELF) { + record.setDestination(record.getOriginalQueue()); + } else if (destinations.isEmpty()) { + record.markForDelete(); + + if (autoTerminatedEvents == null) { + autoTerminatedEvents = new ArrayList<>(); + } + + final ProvenanceEventRecord dropEvent; + try { + dropEvent = provenanceReporter.generateDropEvent(record.getCurrent(), "Auto-Terminated by " + relationship.getName() + " Relationship"); + autoTerminatedEvents.add(dropEvent); + } catch (final Exception e) { + LOG.warn("Unable to generate Provenance Event for {} on behalf of {} due to {}", record.getCurrent(), connectableDescription, e); + if (LOG.isDebugEnabled()) { + LOG.warn("", e); + } } - newRecord.setWorking(clone, Collections.<String, String> emptyMap(), false); + } else { + final Connection finalDestination = destinations.remove(destinations.size() - 1); // remove last element + FlowFileRecord currRec = record.getCurrent(); + StandardFlowFileRecord.Builder builder = new StandardFlowFileRecord.Builder().fromFlowFile(currRec); + builder.removeAttributes("retryCount"); Review comment: I can imagine this potentially causing problems if a user is using an attribute named `retryCount` (and i think this is common for users who have built their own retry logic since this hasn't been available thus far). Perhaps we should name it `retryCount.<Component ID>` to avoid naming collisions. ########## File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java ########## @@ -289,63 +292,112 @@ private void checkpoint(final boolean copyCollections) { // validate that all records have a transfer relationship for them and if so determine the destination node and clone as necessary final Map<Long, StandardRepositoryRecord> toAdd = new HashMap<>(); + final Map<String, StandardRepositoryRecord> uuidsToRecords = records.values() + .stream() + .collect(Collectors.toMap(record -> record.getCurrent().getAttribute(CoreAttributes.UUID.key()), Function.identity())); + for (final StandardRepositoryRecord record : records.values()) { if (record.isMarkedForDelete()) { continue; } + if (records.get(record.getCurrent().getId()) == null) { + continue; + } + final Relationship relationship = record.getTransferRelationship(); - final List<Connection> destinations = new ArrayList<>(context.getConnections(relationship)); - if (destinations.isEmpty() && relationship == Relationship.SELF) { - record.setDestination(record.getOriginalQueue()); - } else if (destinations.isEmpty()) { - record.markForDelete(); + final Connectable connectable = context.getConnectable(); + final FlowFileRecord currentFlowFile = record.getOriginal(); + int retryCount = 0; + if (currentFlowFile != null) { + String retryCountString = currentFlowFile.getAttribute("retryCount"); + if (retryCountString != null) { + retryCount = Integer.parseInt(retryCountString); Review comment: This should be wrapped in a try/catch - a NumberFormatException may well be thrown here if user sets attribute themselves. ########## File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java ########## @@ -659,4 +670,56 @@ public void setVersionedComponentId(final String versionedComponentId) { } } } + + @Override + public int getRetryCount() { + return 0; + } + + @Override + public void setRetryCount(Integer retryCount) { + this.retryCount = retryCount; + } + + @Override + public Set<String> getRetriedRelationships() { + return Collections.EMPTY_SET; + } + + @Override + public void setRetriedRelationships(Set<String> retriedRelationships) { + this.retriedRelationships = (retriedRelationships == null) ? Collections.emptySet() : new HashSet<>(retriedRelationships); Review comment: There's no need to store this if we always return `emptySet()`. This can be a no-op ########## File path: nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/StandardFlowComparator.java ########## @@ -169,6 +169,10 @@ private void compare(final VersionedProcessor processorA, final VersionedProcess addIfDifferent(differences, DifferenceType.SCHEDULED_STATE_CHANGED, processorA, processorB, VersionedProcessor::getScheduledState); addIfDifferent(differences, DifferenceType.STYLE_CHANGED, processorA, processorB, VersionedProcessor::getStyle); addIfDifferent(differences, DifferenceType.YIELD_DURATION_CHANGED, processorA, processorB, VersionedProcessor::getYieldDuration); + addIfDifferent(differences, DifferenceType.RETRY_COUNTS_CHANGED, processorA, processorB, VersionedProcessor::getRetryCount); Review comment: Should be DifferenceType.RETRY_COUNT_CHANGED ########## File path: nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/VersionedFlowBuilder.java ########## @@ -144,6 +144,10 @@ public VersionedProcessor createProcessor(final Bundle bundle, final String type processor.setType(type); processor.setYieldDuration("1 sec"); processor.setSchedulingStrategy("TIMER_DRIVEN"); + processor.setRetryCount(0); + processor.setBackoffMechanism("PENALIZE_FLOWFILE"); + processor.setRetriedRelationships(new HashSet<>()); Review comment: Should probably use `Collections.emptySet()` ########## File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java ########## @@ -659,4 +670,56 @@ public void setVersionedComponentId(final String versionedComponentId) { } } } + + @Override + public int getRetryCount() { + return 0; + } + + @Override + public void setRetryCount(Integer retryCount) { + this.retryCount = retryCount; + } + + @Override + public Set<String> getRetriedRelationships() { + return Collections.EMPTY_SET; + } + + @Override + public void setRetriedRelationships(Set<String> retriedRelationships) { + this.retriedRelationships = (retriedRelationships == null) ? Collections.emptySet() : new HashSet<>(retriedRelationships); + } + + @Override + public boolean isRelationshipRetried(Relationship relationship) { + return false; + } + + @Override + public BackoffMechanism getBackoffMechanism() { + return BackoffMechanism.PENALIZE_FLOWFILE; + } + + @Override + public void setBackoffMechanism(BackoffMechanism backoffMechanism) { + this.backoffMechanism = (backoffMechanism == null) ? BackoffMechanism.PENALIZE_FLOWFILE : backoffMechanism; Review comment: There's no need to store this if we always return the same value. This can be a no-op ########## File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java ########## @@ -356,6 +408,117 @@ private void checkpoint(final boolean copyCollections) { checkpoint.checkpoint(this, autoTerminatedEvents, copyCollections); } + private boolean isRetryNeeded(final Connectable connectable, final StandardRepositoryRecord record, final FlowFileRecord currentFlowFile, + final int retryCount, final Map<String, StandardRepositoryRecord> uuidsToRecords) { + if (currentFlowFile == null || connectable == null || connectable.getRetriedRelationships().isEmpty()) { + return false; + } + + if (connectable.isRelationshipRetried(record.getTransferRelationship())) { + return retryCount < connectable.getRetryCount(); + } + + final ProvenanceEventBuilder eventBuilder = forkEventBuilders.get(currentFlowFile); + if (eventBuilder != null) { + for (String uuid : eventBuilder.getChildFlowFileIds()) { + if (connectable.isRelationshipRetried(uuidsToRecords.get(uuid).getTransferRelationship())) { + return retryCount < connectable.getRetryCount(); + } + } + } Review comment: I'm a bit concerned about the use of provenance events here for determining whether or not a FlowFile needs to be retried. I can definitely appreciate why that association was made here - it's the data model that we have that also encapsulates this linkage. But the provenance is really about "auditing" moreso than the logic of how FlowFiles within the session should be linked. This also can get very expensive if we are dealing with multiple splits. It probably makes sense to introduce a separate linkage between the FlowFiles outside of the provenance events. ########## File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ProcessorAuditor.java ########## @@ -80,6 +80,10 @@ private static final String SCHEDULING_STRATEGY = "Scheduling Strategy"; private static final String EXECUTION_NODE = "Execution Node"; private static final String EXTENSION_VERSION = "Extension Version"; + private static final String RETRY_COUNTS = "Retry Counts"; Review comment: Retry Count, rather than Counts :) ########## File path: nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/VersionedFlowBuilder.java ########## @@ -144,6 +144,10 @@ public VersionedProcessor createProcessor(final Bundle bundle, final String type processor.setType(type); processor.setYieldDuration("1 sec"); processor.setSchedulingStrategy("TIMER_DRIVEN"); + processor.setRetryCount(0); + processor.setBackoffMechanism("PENALIZE_FLOWFILE"); Review comment: Should probably use `BackoffMechanism.PENALIZE_FLOWFILE.name()` ########## File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java ########## @@ -577,6 +588,58 @@ public String getComponentType() { return "Funnel"; } + @Override + public int getRetryCount() { + return 0; + } + + @Override + public void setRetryCount(Integer retryCount) { + this.retryCount = retryCount; + } + + @Override + public Set<String> getRetriedRelationships() { + return Collections.EMPTY_SET; + } + + @Override + public void setRetriedRelationships(Set<String> retriedRelationships) { + this.retriedRelationships = (retriedRelationships == null) ? Collections.emptySet() : new HashSet<>(retriedRelationships); + } + + @Override + public boolean isRelationshipRetried(Relationship relationship) { + return false; + } + + @Override + public BackoffMechanism getBackoffMechanism() { + return BackoffMechanism.PENALIZE_FLOWFILE; + } + + @Override + public void setBackoffMechanism(BackoffMechanism backoffMechanism) { + this.backoffMechanism = (backoffMechanism == null) ? BackoffMechanism.PENALIZE_FLOWFILE : backoffMechanism; + } + + @Override + public String getMaxBackoffPeriod() { + return DEFAULT_MAX_BACKOFF_PERIOD; + } + + @Override + public void setMaxBackoffPeriod(String maxBackoffPeriod) { + if (maxBackoffPeriod == null) { + maxBackoffPeriod = DEFAULT_MAX_BACKOFF_PERIOD; + } + final long backoffNanos = FormatUtils.getTimeDuration(maxBackoffPeriod, TimeUnit.NANOSECONDS); + if (backoffNanos < 0) { + throw new IllegalArgumentException("Max Backoff Period must be positive"); + } + this.maxBackoffPeriod = maxBackoffPeriod; Review comment: Again, no need to implement setters or have the member variables if we aren't going to make use of them -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org