markap14 commented on a change in pull request #5593:
URL: https://github.com/apache/nifi/pull/5593#discussion_r792878063



##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java
##########
@@ -659,4 +670,56 @@ public void setVersionedComponentId(final String 
versionedComponentId) {
             }
         }
     }
+
+    @Override
+    public int getRetryCount() {
+        return 0;
+    }
+
+    @Override
+    public void setRetryCount(Integer retryCount) {
+        this.retryCount = retryCount;
+    }
+
+    @Override
+    public Set<String> getRetriedRelationships() {
+        return Collections.EMPTY_SET;

Review comment:
       Should use `Collections.emptySet()`

##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java
##########
@@ -659,4 +670,56 @@ public void setVersionedComponentId(final String 
versionedComponentId) {
             }
         }
     }
+
+    @Override
+    public int getRetryCount() {
+        return 0;
+    }
+
+    @Override
+    public void setRetryCount(Integer retryCount) {
+        this.retryCount = retryCount;

Review comment:
       There's no need to store this if we always return `0`. This can be a 
no-op

##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
##########
@@ -289,63 +292,112 @@ private void checkpoint(final boolean copyCollections) {
 
         // validate that all records have a transfer relationship for them and 
if so determine the destination node and clone as necessary
         final Map<Long, StandardRepositoryRecord> toAdd = new HashMap<>();
+        final Map<String, StandardRepositoryRecord> uuidsToRecords = 
records.values()
+                .stream()
+                .collect(Collectors.toMap(record -> 
record.getCurrent().getAttribute(CoreAttributes.UUID.key()), 
Function.identity()));
+
         for (final StandardRepositoryRecord record : records.values()) {
             if (record.isMarkedForDelete()) {
                 continue;
             }
 
+            if (records.get(record.getCurrent().getId()) == null) {
+                continue;
+            }
+
             final Relationship relationship = record.getTransferRelationship();
-            final List<Connection> destinations = new 
ArrayList<>(context.getConnections(relationship));
 
-            if (destinations.isEmpty() && relationship == Relationship.SELF) {
-                record.setDestination(record.getOriginalQueue());
-            } else if (destinations.isEmpty()) {
-                record.markForDelete();
+            final Connectable connectable = context.getConnectable();
+            final FlowFileRecord currentFlowFile = record.getOriginal();

Review comment:
       Is the intent here to get the original version of the FlowFile or the 
current version? I'm confused by the variable name `currentFlowFile` when we're 
using `record.getOriginal()` - we should either rename variable to 
`originalFlowFile` if that's what we want or call `record.getCurrent()` if 
that's what we want.

##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
##########
@@ -301,13 +301,13 @@ private void checkpoint(final boolean copyCollections) {
                 continue;
             }
 
-            final Relationship relationship = record.getTransferRelationship();
-            ProcessorNode processorNode = null;
-
-            if (context.getConnectable() instanceof ProcessorNode) {
-                processorNode = (ProcessorNode) context.getConnectable();
+            if (records.get(record.getCurrent().getId()) == null) {
+                continue;
             }

Review comment:
       Why was this check added? The Record came from the `this.records` Map - 
why are we checking that it exists in the map?

##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java
##########
@@ -659,4 +670,56 @@ public void setVersionedComponentId(final String 
versionedComponentId) {
             }
         }
     }
+
+    @Override
+    public int getRetryCount() {
+        return 0;
+    }
+
+    @Override
+    public void setRetryCount(Integer retryCount) {
+        this.retryCount = retryCount;
+    }
+
+    @Override
+    public Set<String> getRetriedRelationships() {
+        return Collections.EMPTY_SET;
+    }
+
+    @Override
+    public void setRetriedRelationships(Set<String> retriedRelationships) {
+        this.retriedRelationships = (retriedRelationships == null) ? 
Collections.emptySet() : new HashSet<>(retriedRelationships);
+    }
+
+    @Override
+    public boolean isRelationshipRetried(Relationship relationship) {
+        return false;
+    }
+
+    @Override
+    public BackoffMechanism getBackoffMechanism() {
+        return BackoffMechanism.PENALIZE_FLOWFILE;
+    }
+
+    @Override
+    public void setBackoffMechanism(BackoffMechanism backoffMechanism) {
+        this.backoffMechanism = (backoffMechanism == null) ? 
BackoffMechanism.PENALIZE_FLOWFILE : backoffMechanism;
+    }
+
+    @Override
+    public String getMaxBackoffPeriod() {
+        return DEFAULT_MAX_BACKOFF_PERIOD;
+    }
+
+    @Override
+    public void setMaxBackoffPeriod(String maxBackoffPeriod) {
+        if (maxBackoffPeriod == null) {

Review comment:
       There's no need to store this if we always return the same value. This 
can be a no-op

##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
##########
@@ -289,63 +292,112 @@ private void checkpoint(final boolean copyCollections) {
 
         // validate that all records have a transfer relationship for them and 
if so determine the destination node and clone as necessary
         final Map<Long, StandardRepositoryRecord> toAdd = new HashMap<>();
+        final Map<String, StandardRepositoryRecord> uuidsToRecords = 
records.values()
+                .stream()
+                .collect(Collectors.toMap(record -> 
record.getCurrent().getAttribute(CoreAttributes.UUID.key()), 
Function.identity()));
+
         for (final StandardRepositoryRecord record : records.values()) {
             if (record.isMarkedForDelete()) {
                 continue;
             }
 
+            if (records.get(record.getCurrent().getId()) == null) {
+                continue;
+            }
+
             final Relationship relationship = record.getTransferRelationship();
-            final List<Connection> destinations = new 
ArrayList<>(context.getConnections(relationship));
 
-            if (destinations.isEmpty() && relationship == Relationship.SELF) {
-                record.setDestination(record.getOriginalQueue());
-            } else if (destinations.isEmpty()) {
-                record.markForDelete();
+            final Connectable connectable = context.getConnectable();
+            final FlowFileRecord currentFlowFile = record.getOriginal();
+            int retryCount = 0;
+            if (currentFlowFile != null) {
+                String retryCountString = 
currentFlowFile.getAttribute("retryCount");
+                if (retryCountString != null) {
+                    retryCount = Integer.parseInt(retryCountString);
+                }
+            }
+
+            if (isRetryNeeded(connectable, record, currentFlowFile, 
retryCount, uuidsToRecords)) {
 
-                if (autoTerminatedEvents == null) {
-                    autoTerminatedEvents = new ArrayList<>();
+                final boolean isProcessorYielding = 
connectable.getBackoffMechanism().equals(BackoffMechanism.YIELD_PROCESSOR);
+                final long maxBackoffPeriod = 
Math.round(FormatUtils.getPreciseTimeDuration(connectable.getMaxBackoffPeriod(),
 TimeUnit.MILLISECONDS));
+                final long penalizationTime;
+
+                if (isProcessorYielding) {
+                    penalizationTime = 
connectable.getYieldPeriod(TimeUnit.MILLISECONDS);
+                } else {
+                    penalizationTime = 
connectable.getPenalizationPeriod(TimeUnit.MILLISECONDS);
                 }
 
-                final ProvenanceEventRecord dropEvent;
-                try {
-                    dropEvent = 
provenanceReporter.generateDropEvent(record.getCurrent(), "Auto-Terminated by " 
+ relationship.getName() + " Relationship");
-                    autoTerminatedEvents.add(dropEvent);
-                } catch (final Exception e) {
-                    LOG.warn("Unable to generate Provenance Event for {} on 
behalf of {} due to {}", record.getCurrent(), connectableDescription, e);
-                    if (LOG.isDebugEnabled()) {
-                        LOG.warn("", e);
-                    }
+                final long backoffTime = calculateBackoffTime(retryCount, 
maxBackoffPeriod, penalizationTime);
+                retryCount++;
+
+                adjustConnectableStatsForRetry(record, relationship, 
uuidsToRecords);
+                final FlowFileRecord updatedRecord = 
resetFlowFileRecordAndUpdateItForRetry(record, uuidsToRecords, retryCount, 
currentFlowFile);
+                clearProvenanceEventsForRetry(currentFlowFile);
+
+                if (isProcessorYielding) {
+                    connectable.yield(backoffTime, TimeUnit.MILLISECONDS);
+                } else {
+                    penalize(updatedRecord, backoffTime, 
TimeUnit.MILLISECONDS);
                 }
+
             } else {
-                final Connection finalDestination = 
destinations.remove(destinations.size() - 1); // remove last element
-                record.setDestination(finalDestination.getFlowFileQueue());
-                incrementConnectionInputCounts(finalDestination, record);
-
-                for (final Connection destination : destinations) { // iterate 
over remaining destinations and "clone" as needed
-                    incrementConnectionInputCounts(destination, record);
-                    final FlowFileRecord currRec = record.getCurrent();
-                    final StandardFlowFileRecord.Builder builder = new 
StandardFlowFileRecord.Builder().fromFlowFile(currRec);
-                    builder.id(context.getNextFlowFileSequence());
-
-                    final String newUuid = UUID.randomUUID().toString();
-                    builder.addAttribute(CoreAttributes.UUID.key(), newUuid);
-
-                    final FlowFileRecord clone = builder.build();
-                    final StandardRepositoryRecord newRecord = new 
StandardRepositoryRecord(destination.getFlowFileQueue());
-                    provenanceReporter.clone(currRec, clone, false);
-
-                    final ContentClaim claim = clone.getContentClaim();
-                    if (claim != null) {
-                        
context.getContentRepository().incrementClaimaintCount(claim);
+                final List<Connection> destinations = new 
ArrayList<>(context.getConnections(relationship));
+
+                if (destinations.isEmpty() && relationship == 
Relationship.SELF) {
+                    record.setDestination(record.getOriginalQueue());
+                } else if (destinations.isEmpty()) {
+                    record.markForDelete();
+
+                    if (autoTerminatedEvents == null) {
+                        autoTerminatedEvents = new ArrayList<>();
+                    }
+
+                    final ProvenanceEventRecord dropEvent;
+                    try {
+                        dropEvent = 
provenanceReporter.generateDropEvent(record.getCurrent(), "Auto-Terminated by " 
+ relationship.getName() + " Relationship");
+                        autoTerminatedEvents.add(dropEvent);
+                    } catch (final Exception e) {
+                        LOG.warn("Unable to generate Provenance Event for {} 
on behalf of {} due to {}", record.getCurrent(), connectableDescription, e);
+                        if (LOG.isDebugEnabled()) {
+                            LOG.warn("", e);
+                        }
                     }
-                    newRecord.setWorking(clone, Collections.<String, String> 
emptyMap(), false);
+                } else {
+                    final Connection finalDestination = 
destinations.remove(destinations.size() - 1); // remove last element
+                    FlowFileRecord currRec = record.getCurrent();
+                    StandardFlowFileRecord.Builder builder = new 
StandardFlowFileRecord.Builder().fromFlowFile(currRec);
+                    builder.removeAttributes("retryCount");

Review comment:
       I can imagine this potentially causing problems if a user is using an 
attribute named `retryCount` (and i think this is common for users who have 
built their own retry logic since this hasn't been available thus far). Perhaps 
we should name it `retryCount.<Component ID>` to avoid naming collisions.

##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
##########
@@ -289,63 +292,112 @@ private void checkpoint(final boolean copyCollections) {
 
         // validate that all records have a transfer relationship for them and 
if so determine the destination node and clone as necessary
         final Map<Long, StandardRepositoryRecord> toAdd = new HashMap<>();
+        final Map<String, StandardRepositoryRecord> uuidsToRecords = 
records.values()
+                .stream()
+                .collect(Collectors.toMap(record -> 
record.getCurrent().getAttribute(CoreAttributes.UUID.key()), 
Function.identity()));
+
         for (final StandardRepositoryRecord record : records.values()) {
             if (record.isMarkedForDelete()) {
                 continue;
             }
 
+            if (records.get(record.getCurrent().getId()) == null) {
+                continue;
+            }
+
             final Relationship relationship = record.getTransferRelationship();
-            final List<Connection> destinations = new 
ArrayList<>(context.getConnections(relationship));
 
-            if (destinations.isEmpty() && relationship == Relationship.SELF) {
-                record.setDestination(record.getOriginalQueue());
-            } else if (destinations.isEmpty()) {
-                record.markForDelete();
+            final Connectable connectable = context.getConnectable();
+            final FlowFileRecord currentFlowFile = record.getOriginal();
+            int retryCount = 0;
+            if (currentFlowFile != null) {
+                String retryCountString = 
currentFlowFile.getAttribute("retryCount");
+                if (retryCountString != null) {
+                    retryCount = Integer.parseInt(retryCountString);

Review comment:
       This should be wrapped in a try/catch - a NumberFormatException may well 
be thrown here if user sets attribute themselves.

##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java
##########
@@ -659,4 +670,56 @@ public void setVersionedComponentId(final String 
versionedComponentId) {
             }
         }
     }
+
+    @Override
+    public int getRetryCount() {
+        return 0;
+    }
+
+    @Override
+    public void setRetryCount(Integer retryCount) {
+        this.retryCount = retryCount;
+    }
+
+    @Override
+    public Set<String> getRetriedRelationships() {
+        return Collections.EMPTY_SET;
+    }
+
+    @Override
+    public void setRetriedRelationships(Set<String> retriedRelationships) {
+        this.retriedRelationships = (retriedRelationships == null) ? 
Collections.emptySet() : new HashSet<>(retriedRelationships);

Review comment:
       There's no need to store this if we always return `emptySet()`. This can 
be a no-op

##########
File path: 
nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/StandardFlowComparator.java
##########
@@ -169,6 +169,10 @@ private void compare(final VersionedProcessor processorA, 
final VersionedProcess
         addIfDifferent(differences, DifferenceType.SCHEDULED_STATE_CHANGED, 
processorA, processorB, VersionedProcessor::getScheduledState);
         addIfDifferent(differences, DifferenceType.STYLE_CHANGED, processorA, 
processorB, VersionedProcessor::getStyle);
         addIfDifferent(differences, DifferenceType.YIELD_DURATION_CHANGED, 
processorA, processorB, VersionedProcessor::getYieldDuration);
+        addIfDifferent(differences, DifferenceType.RETRY_COUNTS_CHANGED, 
processorA, processorB, VersionedProcessor::getRetryCount);

Review comment:
       Should be DifferenceType.RETRY_COUNT_CHANGED

##########
File path: 
nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/VersionedFlowBuilder.java
##########
@@ -144,6 +144,10 @@ public VersionedProcessor createProcessor(final Bundle 
bundle, final String type
         processor.setType(type);
         processor.setYieldDuration("1 sec");
         processor.setSchedulingStrategy("TIMER_DRIVEN");
+        processor.setRetryCount(0);
+        processor.setBackoffMechanism("PENALIZE_FLOWFILE");
+        processor.setRetriedRelationships(new HashSet<>());

Review comment:
       Should probably use `Collections.emptySet()`

##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java
##########
@@ -659,4 +670,56 @@ public void setVersionedComponentId(final String 
versionedComponentId) {
             }
         }
     }
+
+    @Override
+    public int getRetryCount() {
+        return 0;
+    }
+
+    @Override
+    public void setRetryCount(Integer retryCount) {
+        this.retryCount = retryCount;
+    }
+
+    @Override
+    public Set<String> getRetriedRelationships() {
+        return Collections.EMPTY_SET;
+    }
+
+    @Override
+    public void setRetriedRelationships(Set<String> retriedRelationships) {
+        this.retriedRelationships = (retriedRelationships == null) ? 
Collections.emptySet() : new HashSet<>(retriedRelationships);
+    }
+
+    @Override
+    public boolean isRelationshipRetried(Relationship relationship) {
+        return false;
+    }
+
+    @Override
+    public BackoffMechanism getBackoffMechanism() {
+        return BackoffMechanism.PENALIZE_FLOWFILE;
+    }
+
+    @Override
+    public void setBackoffMechanism(BackoffMechanism backoffMechanism) {
+        this.backoffMechanism = (backoffMechanism == null) ? 
BackoffMechanism.PENALIZE_FLOWFILE : backoffMechanism;

Review comment:
       There's no need to store this if we always return the same value. This 
can be a no-op

##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
##########
@@ -356,6 +408,117 @@ private void checkpoint(final boolean copyCollections) {
         checkpoint.checkpoint(this, autoTerminatedEvents, copyCollections);
     }
 
+    private boolean isRetryNeeded(final Connectable connectable, final 
StandardRepositoryRecord record, final FlowFileRecord currentFlowFile,
+                                  final int retryCount, final Map<String, 
StandardRepositoryRecord> uuidsToRecords) {
+        if (currentFlowFile == null || connectable == null || 
connectable.getRetriedRelationships().isEmpty()) {
+            return false;
+        }
+
+        if 
(connectable.isRelationshipRetried(record.getTransferRelationship())) {
+            return retryCount < connectable.getRetryCount();
+        }
+
+        final ProvenanceEventBuilder eventBuilder = 
forkEventBuilders.get(currentFlowFile);
+        if (eventBuilder != null) {
+            for (String uuid : eventBuilder.getChildFlowFileIds()) {
+                if 
(connectable.isRelationshipRetried(uuidsToRecords.get(uuid).getTransferRelationship()))
 {
+                    return retryCount < connectable.getRetryCount();
+                }
+            }
+        }

Review comment:
       I'm a bit concerned about the use of provenance events here for 
determining whether or not a FlowFile needs to be retried. I can definitely 
appreciate why that association was made here - it's the data model that we 
have that also encapsulates this linkage. But the provenance is really about 
"auditing" moreso than the logic of how FlowFiles within the session should be 
linked. This also can get very expensive if we are dealing with multiple 
splits. It probably makes sense to introduce a separate linkage between the 
FlowFiles outside of the provenance events.

##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ProcessorAuditor.java
##########
@@ -80,6 +80,10 @@
     private static final String SCHEDULING_STRATEGY = "Scheduling Strategy";
     private static final String EXECUTION_NODE = "Execution Node";
     private static final String EXTENSION_VERSION = "Extension Version";
+    private static final String RETRY_COUNTS = "Retry Counts";

Review comment:
       Retry Count, rather than Counts :)

##########
File path: 
nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/VersionedFlowBuilder.java
##########
@@ -144,6 +144,10 @@ public VersionedProcessor createProcessor(final Bundle 
bundle, final String type
         processor.setType(type);
         processor.setYieldDuration("1 sec");
         processor.setSchedulingStrategy("TIMER_DRIVEN");
+        processor.setRetryCount(0);
+        processor.setBackoffMechanism("PENALIZE_FLOWFILE");

Review comment:
       Should probably use `BackoffMechanism.PENALIZE_FLOWFILE.name()`

##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java
##########
@@ -577,6 +588,58 @@ public String getComponentType() {
         return "Funnel";
     }
 
+    @Override
+    public int getRetryCount() {
+        return 0;
+    }
+
+    @Override
+    public void setRetryCount(Integer retryCount) {
+        this.retryCount = retryCount;
+    }
+
+    @Override
+    public Set<String> getRetriedRelationships() {
+        return Collections.EMPTY_SET;
+    }
+
+    @Override
+    public void setRetriedRelationships(Set<String> retriedRelationships) {
+        this.retriedRelationships = (retriedRelationships == null) ? 
Collections.emptySet() : new HashSet<>(retriedRelationships);
+    }
+
+    @Override
+    public boolean isRelationshipRetried(Relationship relationship) {
+        return false;
+    }
+
+    @Override
+    public BackoffMechanism getBackoffMechanism() {
+        return BackoffMechanism.PENALIZE_FLOWFILE;
+    }
+
+    @Override
+    public void setBackoffMechanism(BackoffMechanism backoffMechanism) {
+        this.backoffMechanism = (backoffMechanism == null) ? 
BackoffMechanism.PENALIZE_FLOWFILE : backoffMechanism;
+    }
+
+    @Override
+    public String getMaxBackoffPeriod() {
+        return DEFAULT_MAX_BACKOFF_PERIOD;
+    }
+
+    @Override
+    public void setMaxBackoffPeriod(String maxBackoffPeriod) {
+        if (maxBackoffPeriod == null) {
+            maxBackoffPeriod = DEFAULT_MAX_BACKOFF_PERIOD;
+        }
+        final long backoffNanos = 
FormatUtils.getTimeDuration(maxBackoffPeriod, TimeUnit.NANOSECONDS);
+        if (backoffNanos < 0) {
+            throw new IllegalArgumentException("Max Backoff Period must be 
positive");
+        }
+        this.maxBackoffPeriod = maxBackoffPeriod;

Review comment:
       Again, no need to implement setters or have the member variables if we 
aren't going to make use of them




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to