tpalfy commented on a change in pull request #4916:
URL: https://github.com/apache/nifi/pull/4916#discussion_r597880479



##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
##########
@@ -735,13 +757,15 @@ private void processTailFile(final ProcessContext 
context, final ProcessSession
         if (!rotated) {
             final long fileLength = file.length();
             if (length > fileLength) {
+                getLogger().debug("Rotated = true because TailFile State 
Length = {}, File Length = {}", length, fileLength);

Review comment:
       Minor (bit confusing otherwise)
   ```suggestion
                   getLogger().debug("Rotated = true because TailFileState 
Length = {}, File Length = {}", length, fileLength);
   ```

##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
##########
@@ -166,10 +168,21 @@
                     + "(without extension), and will assume that the files 
that have rolled over live in the same directory as the file being tailed. "
                     + "The same glob pattern will be used for all files.")
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .expressionLanguageSupported(NONE)
             .required(false)
             .build();
 
+    static final PropertyDescriptor ROLLOVER_TAIL_PERIOD = new 
PropertyDescriptor.Builder()

Review comment:
       Minor: It maybe just me but to me "rollover" feels more like a verb 
rather than a noun/adjective. So "Rollover Tail Period" is a bit confusing. 
"Post-Rollover Tail Period" feels more natural and highlights the idea that we 
do something that we usually do not, _after_ the rollover.

##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
##########
@@ -691,6 +705,14 @@ private void processTailFile(final ProcessContext context, 
final ProcessSession
             }
 
             rolloverOccurred = recoverRolledFiles(context, session, tailFile, 
expectedChecksumValue, tfo.getState().getTimestamp(), 
tfo.getState().getPosition());
+            if (rolloverOccurred) {
+                final boolean tailRollover = 
context.getProperty(ROLLOVER_TAIL_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS) > 
0;

Review comment:
       Minor: Similar to previous comment.
   We could use `tailAfterRollover` or even `tailRolledOver` for example.

##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
##########
@@ -1215,11 +1242,30 @@ private boolean recoverRolledFiles(final ProcessContext 
context, final ProcessSe
                                         
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
                                 session.transfer(flowFile, REL_SUCCESS);
                                 getLogger().debug("Created {} from rolled over 
file {} and routed to success", new Object[]{flowFile, firstFile});
+                            }
 
+                            // We need to update the state to account for the 
fact that we just brought data in.
+                            // If we are going to tail a rolled over file for 
some amount of time, then we need to keep the state pointing to the
+                            // same file, just using an updated 
position/timestamp/checksum/length. This way, the next iteration will compare 
against these
+                            // updated values.
+                            // But if we are not going to tail the rolled over 
file for any period of time, we can essentially reset the state.
+                            final long rolloverTailMillis = 
context.getProperty(ROLLOVER_TAIL_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS);

Review comment:
       Minor
   ```suggestion
                               final long postRolloverTailMillis = 
context.getProperty(ROLLOVER_TAIL_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS);
   ```

##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
##########
@@ -311,6 +313,79 @@ private File rollover(final int index) throws IOException {
     }
 
 
+    @Test
+    public void testFileWrittenToAfterRollover() throws IOException, 
InterruptedException {
+        Assume.assumeTrue("Test requires renaming a file while a file handle 
is still open to it, so it won't run on Windows", !SystemUtils.IS_OS_WINDOWS);
+
+        runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.*");
+        runner.setProperty(TailFile.START_POSITION, 
TailFile.START_BEGINNING_OF_TIME.getValue());
+        runner.setProperty(TailFile.REREAD_ON_NUL, "true");
+        runner.setProperty(TailFile.ROLLOVER_TAIL_PERIOD, "10 mins");
+
+        // first line fully written, second partially

Review comment:
       I'd rather remove these comments. Code is clear enough and some of these 
already seem to be outdated.
   Like this one: why do we consider the second line partial?

##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
##########
@@ -1215,11 +1242,30 @@ private boolean recoverRolledFiles(final ProcessContext 
context, final ProcessSe
                                         
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
                                 session.transfer(flowFile, REL_SUCCESS);
                                 getLogger().debug("Created {} from rolled over 
file {} and routed to success", new Object[]{flowFile, firstFile});
+                            }
 
+                            // We need to update the state to account for the 
fact that we just brought data in.
+                            // If we are going to tail a rolled over file for 
some amount of time, then we need to keep the state pointing to the
+                            // same file, just using an updated 
position/timestamp/checksum/length. This way, the next iteration will compare 
against these
+                            // updated values.
+                            // But if we are not going to tail the rolled over 
file for any period of time, we can essentially reset the state.
+                            final long rolloverTailMillis = 
context.getProperty(ROLLOVER_TAIL_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS);
+                            final long millisSinceUpdate = 
System.currentTimeMillis() - firstFile.lastModified();

Review comment:
       Minor: Just to help not to become uncertain what `timestamp` was about. 
   ```suggestion
                               final long millisSinceUpdate = 
System.currentTimeMillis() - timestamp;
   ```

##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
##########
@@ -311,6 +313,79 @@ private File rollover(final int index) throws IOException {
     }
 
 
+    @Test
+    public void testFileWrittenToAfterRollover() throws IOException, 
InterruptedException {
+        Assume.assumeTrue("Test requires renaming a file while a file handle 
is still open to it, so it won't run on Windows", !SystemUtils.IS_OS_WINDOWS);
+
+        runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.*");
+        runner.setProperty(TailFile.START_POSITION, 
TailFile.START_BEGINNING_OF_TIME.getValue());
+        runner.setProperty(TailFile.REREAD_ON_NUL, "true");
+        runner.setProperty(TailFile.ROLLOVER_TAIL_PERIOD, "10 mins");
+
+        // first line fully written, second partially
+        raf.write("a\nb\n".getBytes());
+        runner.run(1, false, true);
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("a\nb\n");
+        runner.clearTransferState();
+
+        raf.write("c\n".getBytes());
+        runner.run(1, false, false);
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("c\n");
+        runner.clearTransferState();
+
+        // Write additional data to file, then roll file over
+        raf.write("d\n".getBytes());
+
+        final File rolledFile = new File("target/log.1");
+        final boolean renamed = file.renameTo(rolledFile);
+        assertTrue(renamed);
+        raf.getChannel().force(true);
+
+        System.out.println("Wrote d\\n and rolled file");
+
+        // Create the new file
+        final RandomAccessFile newFile = new RandomAccessFile(new 
File("target/log.txt"), "rw");
+        newFile.write("new file\n".getBytes()); // This should not get 
consumed until the old file's last modified date indicates it's complete
+
+        // Trigger processor and verify data is consumed properly
+        runner.run(1, false, false);
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("d\n");
+        runner.clearTransferState();
+
+        // Write to the file and trigger again.
+        raf.write("e\n".getBytes());
+        System.out.println("Wrote e\\n");
+        runner.run(1, false, false);
+
+        // There should be no data consumed because the last modified time is 
too recent.

Review comment:
       Another outdated(?) comment. "e" _has_ been consumed (although from the 
rolled over file)

##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
##########
@@ -1215,11 +1242,30 @@ private boolean recoverRolledFiles(final ProcessContext 
context, final ProcessSe
                                         
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
                                 session.transfer(flowFile, REL_SUCCESS);
                                 getLogger().debug("Created {} from rolled over 
file {} and routed to success", new Object[]{flowFile, firstFile});
+                            }
 
+                            // We need to update the state to account for the 
fact that we just brought data in.
+                            // If we are going to tail a rolled over file for 
some amount of time, then we need to keep the state pointing to the
+                            // same file, just using an updated 
position/timestamp/checksum/length. This way, the next iteration will compare 
against these
+                            // updated values.
+                            // But if we are not going to tail the rolled over 
file for any period of time, we can essentially reset the state.
+                            final long rolloverTailMillis = 
context.getProperty(ROLLOVER_TAIL_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS);
+                            final long millisSinceUpdate = 
System.currentTimeMillis() - firstFile.lastModified();
+                            if (rolloverTailMillis > 0 && millisSinceUpdate < 
rolloverTailMillis) {
+                                getLogger().debug("File {} has been rolled 
over, but it was updated {} millis ago, which is less than the configured 
Rollover Tail Period, so will continue " +
+                                    "tailing", firstFile, millisSinceUpdate);

Review comment:
       ```suggestion
                                   
                                   getLogger().debug("File {} has been rolled 
over, but it was updated {} millis ago, which is less than the configured " + 
ROLLOVER_TAIL_PERIOD.getDisplayName() +
                                       " ({} ms), so will continue tailing", 
firstFile, millisSinceUpdate, rolloverTailMillis);
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to