This is an automated email from the ASF dual-hosted git repository.

bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new fd3f070  NIFI-6677: Update ListHDFS to clear state (when appropriate) 
in an @OnScheduled method, just as AbstractListProcessor does, instead of doing 
it in onTrigger. Doing it in onTrigger is problematic because in a cluster, the 
Primary Node may run for some period of time, perhaps days or months. Then, 
when the Primary Node chagnes, onTrigger gets called for the first time on the 
new Primary Node, and this triggers the processor to clear state.
fd3f070 is described below

commit fd3f0707c66b78622ed4e7f5aeaff63580c46d44
Author: Mark Payne <[email protected]>
AuthorDate: Mon Sep 16 13:46:53 2019 -0400

    NIFI-6677: Update ListHDFS to clear state (when appropriate) in an 
@OnScheduled method, just as AbstractListProcessor does, instead of doing it in 
onTrigger. Doing it in onTrigger is problematic because in a cluster, the 
Primary Node may run for some period of time, perhaps days or months. Then, 
when the Primary Node chagnes, onTrigger gets called for the first time on the 
new Primary Node, and this triggers the processor to clear state.
---
 .../java/org/apache/nifi/processors/hadoop/ListHDFS.java | 16 ++++++++++------
 1 file changed, 10 insertions(+), 6 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
index 27f58ef..15ed4b1 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
@@ -33,6 +33,7 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
@@ -315,6 +316,15 @@ public class ListHDFS extends AbstractHadoopProcessor {
         return toList;
     }
 
+    @OnScheduled
+    public void resetStateIfNecessary(final ProcessContext context) throws 
IOException {
+        if (resetState) {
+            getLogger().debug("Property has been modified. Resetting the state 
values - listing.timestamp and emitted.timestamp to -1L");
+            context.getStateManager().clear(Scope.CLUSTER);
+            this.resetState = false;
+        }
+    }
+
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
         // We have to ensure that we don't continually perform listings, 
because if we perform two listings within
@@ -332,12 +342,6 @@ public class ListHDFS extends AbstractHadoopProcessor {
 
         // Ensure that we are using the latest listing information before we 
try to perform a listing of HDFS files.
         try {
-            if (resetState) {
-                getLogger().debug("Property has been modified. Resetting the 
state values - listing.timestamp and emitted.timestamp to -1L");
-                context.getStateManager().clear(Scope.CLUSTER);
-                this.resetState = false;
-            }
-
             final StateMap stateMap = 
context.getStateManager().getState(Scope.CLUSTER);
             if (stateMap.getVersion() == -1L) {
                 latestTimestampEmitted = -1L;

Reply via email to