Repository: nifi
Updated Branches:
  refs/heads/master 3b15ed855 -> 45df23b1e


NIFI-4607, NIFI-3975, NIFI-4602, NIFI-4606: This closes #2272. Fixed bug in 
TailFile that caused new Primary Node to not pull current Clustered State when 
File Location was set to Remote. Fixed bug that caused TailFile to occasionally 
become 'stuck' when the file it is tailing is renamed and a new file is created 
with the same name. Removed the 'Rolling Strategy' property from TailFile 
because it is not actually used in the processor anymore. Deleted 
MonitorMemoryTest because the unit test was testing the behavior of 
FlowController more than the behavior of the reporting task itself and in order 
to do so had a dependency in the pom.xml on nifi-framework-core, which means 
that it no longer compiles when FlowController is modified.

Signed-off-by: joewitt <joew...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/45df23b1
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/45df23b1
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/45df23b1

Branch: refs/heads/master
Commit: 45df23b1e09f340fb2c7f3c3234462570ddb5f6b
Parents: 3b15ed8
Author: Mark Payne <marka...@hotmail.com>
Authored: Tue Nov 14 13:27:35 2017 -0500
Committer: joewitt <joew...@apache.org>
Committed: Thu Nov 30 11:10:46 2017 -0500

----------------------------------------------------------------------
 .../nifi/processors/standard/TailFile.java      | 205 +++++++++++--------
 .../additionalDetails.html                      |  23 +--
 .../nifi/processors/standard/TestTailFile.java  |  38 +++-
 .../nifi-standard-reporting-tasks/pom.xml       |   6 -
 .../nifi/controller/MonitorMemoryTest.java      | 167 ---------------
 5 files changed, 160 insertions(+), 279 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/45df23b1/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
index f5d3409..2234265 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
@@ -16,39 +16,9 @@
  */
 package org.apache.nifi.processors.standard;
 
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
-import org.apache.nifi.annotation.behavior.Restricted;
-import org.apache.nifi.annotation.behavior.Stateful;
-import org.apache.nifi.annotation.behavior.TriggerSerially;
-import org.apache.nifi.annotation.behavior.WritesAttribute;
-import org.apache.nifi.annotation.behavior.WritesAttributes;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnStopped;
-import org.apache.nifi.components.AllowableValue;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.components.state.Scope;
-import org.apache.nifi.components.state.StateMap;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.processor.AbstractProcessor;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.OutputStreamCallback;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.stream.io.NullOutputStream;
-import org.apache.nifi.stream.io.StreamUtils;
-
 import java.io.BufferedOutputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.EOFException;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
@@ -77,6 +47,37 @@ import java.util.zip.CRC32;
 import java.util.zip.CheckedInputStream;
 import java.util.zip.Checksum;
 
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.Restricted;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.stream.io.NullOutputStream;
+import org.apache.nifi.stream.io.StreamUtils;
+
 // note: it is important that this Processor is not marked as 
@SupportsBatching because the session commits must complete before persisting 
state locally; otherwise, data loss may occur
 @TriggerSerially
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
@@ -89,9 +90,7 @@ import java.util.zip.Checksum;
         + "ingesting files that have been compressed when 'rolled over'.")
 @Stateful(scopes = {Scope.LOCAL, Scope.CLUSTER}, description = "Stores state 
about where in the Tailed File it left off so that on restart it does not have 
to duplicate data. "
         + "State is stored either local or clustered depend on the <File 
Location> property.")
-@WritesAttributes({
-    @WritesAttribute(attribute = "tailfile.original.path", description = "Path 
of the original file the flow file comes from.")
-    })
+@WritesAttribute(attribute = "tailfile.original.path", description = "Path of 
the original file the flow file comes from.")
 @Restricted("Provides operator the ability to read from any file that NiFi has 
access to.")
 public class TailFile extends AbstractProcessor {
 
@@ -195,15 +194,6 @@ public class TailFile extends AbstractProcessor {
             .required(true)
             .build();
 
-    static final PropertyDescriptor ROLLING_STRATEGY = new 
PropertyDescriptor.Builder()
-            .name("tailfile-rolling-strategy")
-            .displayName("Rolling Strategy")
-            .description("Specifies if the files to tail have a fixed name or 
not.")
-            .required(true)
-            .allowableValues(FIXED_NAME, CHANGING_NAME)
-            .defaultValue(FIXED_NAME.getValue())
-            .build();
-
     static final PropertyDescriptor LOOKUP_FREQUENCY = new 
PropertyDescriptor.Builder()
             .name("tailfile-lookup-frequency")
             .displayName("Lookup frequency")
@@ -234,6 +224,7 @@ public class TailFile extends AbstractProcessor {
     private volatile Map<String, TailFileObject> states = new HashMap<String, 
TailFileObject>();
     private volatile AtomicLong lastLookup = new AtomicLong(0L);
     private volatile AtomicBoolean isMultiChanging = new AtomicBoolean(false);
+    private volatile boolean requireStateLookup = true;
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -245,7 +236,6 @@ public class TailFile extends AbstractProcessor {
         properties.add(START_POSITION);
         properties.add(STATE_LOCATION);
         properties.add(RECURSIVE);
-        properties.add(ROLLING_STRATEGY);
         properties.add(LOOKUP_FREQUENCY);
         properties.add(MAXIMUM_AGE);
         return properties;
@@ -267,46 +257,31 @@ public class TailFile extends AbstractProcessor {
     protected Collection<ValidationResult> customValidate(ValidationContext 
context) {
         final List<ValidationResult> results = new 
ArrayList<>(super.customValidate(context));
 
-        
if(context.getProperty(MODE).getValue().equals(MODE_MULTIFILE.getValue())) {
+        if 
(context.getProperty(MODE).getValue().equals(MODE_MULTIFILE.getValue())) {
             String path = 
context.getProperty(BASE_DIRECTORY).evaluateAttributeExpressions().getValue();
-            if(path == null) {
-                results.add(new 
ValidationResult.Builder().subject(BASE_DIRECTORY.getName()).valid(false)
-                        .explanation("Base directory property cannot be empty 
in Multifile mode.").build());
+            if (path == null) {
+                results.add(new ValidationResult.Builder()
+                    .subject(BASE_DIRECTORY.getName())
+                    .valid(false)
+                    .explanation("Base directory property cannot be empty in 
Multifile mode.")
+                    .build());
             } else if (!new File(path).isDirectory()) {
-                results.add(new 
ValidationResult.Builder().subject(BASE_DIRECTORY.getName()).valid(false)
-                            .explanation(path + " is not a 
directory.").build());
-            }
-
-            
if(context.getProperty(ROLLING_STRATEGY).getValue().equals(CHANGING_NAME.getValue()))
 {
-                String freq = context.getProperty(LOOKUP_FREQUENCY).getValue();
-                if(freq == null) {
-                    results.add(new 
ValidationResult.Builder().subject(LOOKUP_FREQUENCY.getName()).valid(false)
-                            .explanation("In Multiple files mode and Changing 
name rolling strategy, lookup frequency "
-                                    + "property must be specified.").build());
-                }
-                String maxAge = context.getProperty(MAXIMUM_AGE).getValue();
-                if(maxAge == null) {
-                    results.add(new 
ValidationResult.Builder().subject(MAXIMUM_AGE.getName()).valid(false)
-                            .explanation("In Multiple files mode and Changing 
name rolling strategy, maximum age "
-                                    + "property must be specified.").build());
-                }
-            } else {
-                long max = context.getProperty(MAXIMUM_AGE).getValue() == null 
? Long.MAX_VALUE : 
context.getProperty(MAXIMUM_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
-                List<String> filesToTail = 
getFilesToTail(context.getProperty(BASE_DIRECTORY).evaluateAttributeExpressions().getValue(),
-                        
context.getProperty(FILENAME).evaluateAttributeExpressions().getValue(),
-                        context.getProperty(RECURSIVE).asBoolean(),
-                        max);
-
-                if(filesToTail.isEmpty()) {
-                    results.add(new 
ValidationResult.Builder().subject(FILENAME.getName()).valid(false)
-                                .explanation("There is no file to tail. Files 
must exist when starting this processor.").build());
-                }
+                results.add(new ValidationResult.Builder()
+                    .subject(BASE_DIRECTORY.getName())
+                    .valid(false)
+                    .explanation(path + " is not a directory.")
+                    .build());
             }
         }
 
         return results;
     }
 
+    @OnPrimaryNodeStateChange
+    public void onPrimaryNodeChange() {
+        this.requireStateLookup = true;
+    }
+
     @OnScheduled
     public void recoverState(final ProcessContext context) throws IOException {
         // set isMultiChanging
@@ -429,10 +404,10 @@ public class TailFile extends AbstractProcessor {
      * @return List of files to tail
      */
     private List<String> getFilesToTail(final String baseDir, String 
fileRegex, boolean isRecursive, long maxAge) {
-        Collection<File> files = FileUtils.listFiles(new File(baseDir), null, 
isRecursive);
-        List<String> result = new ArrayList<String>();
+        final Collection<File> files = FileUtils.listFiles(new File(baseDir), 
null, isRecursive);
+        final List<String> result = new ArrayList<String>();
 
-        String baseDirNoTrailingSeparator = baseDir.endsWith(File.separator) ? 
baseDir.substring(0, baseDir.length() -1) : baseDir;
+        final String baseDirNoTrailingSeparator = 
baseDir.endsWith(File.separator) ? baseDir.substring(0, baseDir.length() - 1) : 
baseDir;
         final String fullRegex;
         if (File.separator.equals("/")) {
             // handle unix-style paths
@@ -441,13 +416,13 @@ public class TailFile extends AbstractProcessor {
             // handle windows-style paths, need to quote backslash characters
             fullRegex = baseDirNoTrailingSeparator + 
Pattern.quote(File.separator) + fileRegex;
         }
-        Pattern p = Pattern.compile(fullRegex);
+        final Pattern p = Pattern.compile(fullRegex);
 
-        for(File file : files) {
-            String path = file.getPath();
-            if(p.matcher(path).matches()) {
-                if(isMultiChanging.get()) {
-                    if((new Date().getTime() - file.lastModified()) < maxAge) {
+        for (File file : files) {
+            final String path = file.getPath();
+            if (p.matcher(path).matches()) {
+                if (isMultiChanging.get()) {
+                    if ((new Date().getTime() - file.lastModified()) < maxAge) 
{
                         result.add(path);
                     }
                 } else {
@@ -515,7 +490,14 @@ public class TailFile extends AbstractProcessor {
             if (existingTailFile.length() >= position) {
                 try (final InputStream tailFileIs = new 
FileInputStream(existingTailFile);
                         final CheckedInputStream in = new 
CheckedInputStream(tailFileIs, checksum)) {
-                    StreamUtils.copy(in, new NullOutputStream(), 
states.get(filePath).getState().getPosition());
+
+                    try {
+                        StreamUtils.copy(in, new NullOutputStream(), 
states.get(filePath).getState().getPosition());
+                    } catch (final EOFException eof) {
+                        // If we hit EOFException, then the file is smaller 
than we expected. Assume rollover.
+                        getLogger().debug("When recovering state, file being 
tailed has less data than was stored in the state. "
+                            + "Assuming rollover. Will begin tailing current 
file from beginning.");
+                    }
 
                     final long checksumResult = in.getChecksum().getValue();
                     if (checksumResult == 
states.get(filePath).getExpectedRecoveryChecksum()) {
@@ -589,16 +571,30 @@ public class TailFile extends AbstractProcessor {
                 try {
                     recoverState(context);
                 } catch (IOException e) {
-                    getLogger().error("Exception raised while looking up for 
new files", e);
+                    getLogger().error("Exception raised while attempting to 
recover state about where the tailing last left off", e);
                     context.yield();
                     return;
                 }
             }
         }
-        if(states.isEmpty()) {
+
+        if (requireStateLookup) {
+            try {
+                recoverState(context);
+            } catch (IOException e) {
+                getLogger().error("Exception raised while attempting to 
recover state about where the tailing last left off", e);
+                context.yield();
+                return;
+            }
+
+            requireStateLookup = false;
+        }
+
+        if (states.isEmpty()) {
             context.yield();
             return;
         }
+
         for (String tailFile : states.keySet()) {
             processTailFile(context, session, tailFile);
         }
@@ -688,9 +684,38 @@ public class TailFile extends AbstractProcessor {
         final long startNanos = System.nanoTime();
 
         // Check if file has rotated
-        if (rolloverOccurred
-                || (timestamp <= file.lastModified() && length > 
file.length())) {
+        // We determine that the file has rotated if any of the following 
conditions are met:
+        // 1. 'rolloverOccured' == true, which indicates that we have found a 
new file matching the rollover pattern.
+        // 2. The file was modified after the timestamp in our state, AND the 
file is smaller than we expected. This satisfies
+        //    the case where we are tailing File A, and that file is then 
renamed (say to B) and a new file named A is created
+        //    and is written to. In such a case, File A may have a file size 
smaller than we have in our state, so we know that
+        //    it rolled over.
+        // 3. The File Channel that we have indicates that the size of the 
file is different than file.length() indicates, AND
+        //    the File Channel also indicates that we have read all data in 
the file. This case may also occur in the same scenario
+        //    as #2, above. In this case, the File Channel is pointing to File 
A, but the 'file' object is pointing to File B. They
+        //    both have the same name but are different files. As a result, 
once we have consumed all data from the File Channel,
+        //    we want to roll over and consume data from the new file.
+        boolean rotated = rolloverOccurred;
+        if (!rotated) {
+            final long fileLength = file.length();
+            if (length > fileLength) {
+                rotated = true;
+            } else {
+                try {
+                    final long readerSize = reader.size();
+                    final long readerPosition = reader.position();
+
+                    if (readerSize == readerPosition && readerSize != 
fileLength) {
+                        rotated = true;
+                    }
+                } catch (final IOException e) {
+                    getLogger().warn("Failed to determined the size or 
position of the File Channel when "
+                        + "determining if the file has rolled over. Will 
assume that the file being tailed has not rolled over", e);
+                }
+            }
+        }
 
+        if (rotated) {
             // Since file has rotated, we close the reader, create a new one, 
and then reset our state.
             try {
                 reader.close();

http://git-wip-us.apache.org/repos/asf/nifi/blob/45df23b1/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.TailFile/additionalDetails.html
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.TailFile/additionalDetails.html
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.TailFile/additionalDetails.html
index 87767a5..75fb867 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.TailFile/additionalDetails.html
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.TailFile/additionalDetails.html
@@ -105,28 +105,23 @@
                and new log messages are always appended in log file of the 
current day.
         </p>
         <p>
-               If the processor is configured with '<b>Multiple files</b>' 
mode and '<b>Fixed name</b>' rolling strategy, the processor will list the 
-               files to tail in the 'Base directory' (recursively or not) and 
matching the regular expression. This listing will only occur once when 
-               the processor is started. In this configuration, the processor 
will act as in 'Single file' mode for all files listed by the processor 
-               when started.
-        </p>
-        <p>
-               If the processor is configured with '<b>Multiple files</b>' 
mode and '<b>Changing name</b>' rolling strategy, two new properties are 
-               mandatory:
+               If the processor is configured with '<b>Multiple files</b>' 
mode, two additional properties are relevant:
         </p>
         <ul>
                <li><b>Lookup frequency</b>: specifies the minimum duration the 
processor will wait before listing again the files to tail.</li>
                <li><b>Maximum age</b>: specifies the necessary minimum 
duration to consider that no new messages will be appended in a file 
-               regarding its last modification date.</li>
+               regarding its last modification date. If the amount of time 
that has elapsed since the file was modified is larger than this
+               period of time, the file will not be tailed. For example, if a 
file was modified 24 hours ago and this property is set to 12 hours,
+               the file will not be tailed. But if this property is set to 36 
hours, then the file will continue to be tailed.</li>
         </ul>
         <p>
-               It is necessary to pay attention to 'Lookup frequency' and 
'Maximum age' properties as well as the frequency at which the processor is 
-               triggered in order to keep good performances. It is recommended 
to keep 'Maximum age' > 'Lookup frequency' > processor scheduling 
-               frequency to avoid loosing messages. It also recommended not to 
set 'Maximum Age' too low because if messages are appended in a file 
-               after this file has been considered "too old", all the messages 
in the file may be read again leading to data duplication.
+               It is necessary to pay attention to 'Lookup frequency' and 
'Maximum age' properties, as well as the frequency at which the processor is 
+               triggered, in order to achieve high performance. It is 
recommended to keep 'Maximum age' > 'Lookup frequency' > processor scheduling 
+               frequency to avoid missing data. It also recommended not to set 
'Maximum Age' too low because if messages are appended in a file 
+               after this file has been considered "too old", all the messages 
in the file may be read again, leading to data duplication.
         </p>
         <p>
-               Besides, if the processor is configured with '<b>Multiple 
files</b>' mode and '<b>Changing name</b>' rolling strategy, the 'Rolling 
+               If the processor is configured with '<b>Multiple files</b>' 
mode, the 'Rolling 
                filename pattern' property must be specific enough to ensure 
that only the rolling files will be listed and not other currently tailed
                files in the same directory (this can be achieved using 
${filename} tag).
         </p>

http://git-wip-us.apache.org/repos/asf/nifi/blob/45df23b1/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
index 485b8e3..4c82703 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
@@ -320,6 +320,42 @@ public class TestTailFile {
     }
 
     @Test
+    public void testRolloverWriteMoreDataThanPrevious() throws IOException, 
InterruptedException {
+        // If we have read all data in a file, and that file does not end with 
a new-line, then the last line
+        // in the file will have been read, added to the checksum, and then we 
would re-seek to "unread" that
+        // last line since it didn't have a new-line. We need to ensure that 
if the data is then rolled over
+        // that our checksum does not take into account those bytes that have 
been "unread."
+
+        // this mimics the case when we are reading a log file that rolls over 
while processor is running.
+        runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.*");
+        runner.run(1, false, true);
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
+
+        raf.write("hello\n".getBytes());
+        runner.run(1, true, false);
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("hello\n");
+        runner.clearTransferState();
+
+        raf.write("world".getBytes());
+
+        runner.run(1);
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0); // 
should not pull in data because no \n
+
+        raf.close();
+        file.renameTo(new File("target/log.1"));
+
+        raf = new RandomAccessFile(new File("target/log.txt"), "rw");
+        raf.write("longer than hello\n".getBytes());
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 2);
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("world");
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(1).assertContentEquals("longer
 than hello\n");
+    }
+
+
+    @Test
     public void testMultipleRolloversAfterHavingReadAllData() throws 
IOException, InterruptedException {
         // this mimics the case when we are reading a log file that rolls over 
while processor is running.
         runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.*");
@@ -670,7 +706,6 @@ public class TestTailFile {
         runner.setProperty(TailFile.LOOKUP_FREQUENCY, "1 sec");
         runner.setProperty(TailFile.FILENAME, "log_[0-9]*\\.txt");
         runner.setProperty(TailFile.RECURSIVE, "false");
-        runner.setProperty(TailFile.ROLLING_STRATEGY, TailFile.FIXED_NAME);
 
         initializeFile("target/log_1.txt", "firstLine\n");
 
@@ -795,7 +830,6 @@ public class TestTailFile {
     public void testMultipleFilesChangingNameStrategy() throws IOException, 
InterruptedException {
         runner.setProperty(TailFile.START_POSITION, 
TailFile.START_CURRENT_FILE);
         runner.setProperty(TailFile.MODE, TailFile.MODE_MULTIFILE);
-        runner.setProperty(TailFile.ROLLING_STRATEGY, TailFile.CHANGING_NAME);
         runner.setProperty(TailFile.BASE_DIRECTORY, "target");
         runner.setProperty(TailFile.FILENAME, ".*app-.*.log");
         runner.setProperty(TailFile.LOOKUP_FREQUENCY, "2s");

http://git-wip-us.apache.org/repos/asf/nifi/blob/45df23b1/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/pom.xml 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/pom.xml
index b408910..722eef2 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/pom.xml
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/pom.xml
@@ -54,12 +54,6 @@
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-framework-core</artifactId>
-            <version>${project.version}</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-nar-utils</artifactId>
             <scope>test</scope>
         </dependency>

http://git-wip-us.apache.org/repos/asf/nifi/blob/45df23b1/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/test/java/org/apache/nifi/controller/MonitorMemoryTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/test/java/org/apache/nifi/controller/MonitorMemoryTest.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/test/java/org/apache/nifi/controller/MonitorMemoryTest.java
deleted file mode 100644
index 815b855..0000000
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/test/java/org/apache/nifi/controller/MonitorMemoryTest.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.nifi.admin.service.AuditService;
-import org.apache.nifi.authorization.Authorizer;
-import org.apache.nifi.bundle.Bundle;
-import org.apache.nifi.controller.repository.FlowFileEventRepository;
-import org.apache.nifi.nar.ExtensionManager;
-import org.apache.nifi.nar.SystemBundle;
-import org.apache.nifi.provenance.MockProvenanceRepository;
-import org.apache.nifi.util.CapturingLogger;
-import org.apache.nifi.util.NiFiProperties;
-import org.apache.nifi.util.Tuple;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.Logger;
-
-import java.io.File;
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-
-public class MonitorMemoryTest {
-
-    private FlowController fc;
-    private Bundle systemBundle;
-
-    @Before
-    public void before() throws Exception {
-        System.setProperty("nifi.properties.file.path", 
"src/test/resources/nifi.properties");
-        final Map<String, String> addProps = new HashMap<>();
-        addProps.put(NiFiProperties.ADMINISTRATIVE_YIELD_DURATION, "1 sec");
-        addProps.put(NiFiProperties.STATE_MANAGEMENT_CONFIG_FILE, 
"target/test-classes/state-management.xml");
-        addProps.put(NiFiProperties.STATE_MANAGEMENT_LOCAL_PROVIDER_ID, 
"local-provider");
-
-        final Tuple<FlowController, Bundle> tuple = 
this.buildFlowControllerForTest(addProps);
-        fc = tuple.getKey();
-        systemBundle = tuple.getValue();
-    }
-
-    @After
-    public void after() throws Exception {
-        fc.shutdown(true);
-        FileUtils.deleteDirectory(new File("./target/test-repo"));
-        FileUtils.deleteDirectory(new File("./target/content_repository"));
-    }
-
-    @Test(expected = IllegalStateException.class)
-    public void validatevalidationKicksInOnWrongPoolNames() throws Exception {
-        ReportingTaskNode reportingTask = 
fc.createReportingTask(MonitorMemory.class.getName(), 
systemBundle.getBundleDetails().getCoordinate());
-
-        Map<String,String> props = new HashMap<>();
-        props.put(MonitorMemory.MEMORY_POOL_PROPERTY.getName(), "foo");
-        reportingTask.setProperties(props);
-        ProcessScheduler ps = fc.getProcessScheduler();
-        ps.schedule(reportingTask);
-    }
-
-    @Test
-    @Ignore // temporarily ignoring it since it fails intermittently due to
-    // unpredictability during full build
-    // still keeping it for local testing
-    public void validateWarnWhenPercentThresholdReached() throws Exception {
-        this.doValidate("10%");
-    }
-
-    /*
-     * We're ignoring this tests as it is practically impossible to run it
-     * reliably together with automated Maven build since we can't control the
-     * state of the JVM on each machine during the build. However, you can run
-     * it selectively for further validation
-     */
-    @Test
-    @Ignore
-    public void validateWarnWhenSizeThresholdReached() throws Exception {
-        this.doValidate("10 MB");
-    }
-
-    public void doValidate(String threshold) throws Exception {
-        CapturingLogger capturingLogger = this.wrapAndReturnCapturingLogger();
-        ReportingTaskNode reportingTask = 
fc.createReportingTask(MonitorMemory.class.getName(), 
systemBundle.getBundleDetails().getCoordinate());
-        reportingTask.setSchedulingPeriod("1 sec");
-
-        Map<String,String> props = new HashMap<>();
-        props.put(MonitorMemory.MEMORY_POOL_PROPERTY.getName(), "PS Old Gen");
-        props.put(MonitorMemory.REPORTING_INTERVAL.getName(), "100 millis");
-        props.put(MonitorMemory.THRESHOLD_PROPERTY.getName(), threshold);
-        reportingTask.setProperties(props);
-
-        ProcessScheduler ps = fc.getProcessScheduler();
-        ps.schedule(reportingTask);
-
-        Thread.sleep(2000);
-        // ensure no memory warning were issued
-        assertTrue(capturingLogger.getWarnMessages().size() == 0);
-
-        // throw something on the heap
-        @SuppressWarnings("unused")
-        byte[] b = new byte[Integer.MAX_VALUE / 3];
-        Thread.sleep(200);
-        assertTrue(capturingLogger.getWarnMessages().size() > 0);
-        assertTrue(capturingLogger.getWarnMessages().get(0).getMsg()
-                .startsWith("Memory Pool 'PS Old Gen' has exceeded the 
configured Threshold of " + threshold));
-
-        // now try to clear the heap and see memory being reclaimed
-        b = null;
-        System.gc();
-        Thread.sleep(1000);
-        
assertTrue(capturingLogger.getInfoMessages().get(0).getMsg().startsWith(
-                "Memory Pool 'PS Old Gen' is no longer exceeding the 
configured Threshold of " + threshold));
-    }
-
-    private CapturingLogger wrapAndReturnCapturingLogger() throws Exception {
-        Field loggerField = MonitorMemory.class.getDeclaredField("logger");
-        Field modifiersField = Field.class.getDeclaredField("modifiers");
-        modifiersField.setAccessible(true);
-        modifiersField.setInt(loggerField, loggerField.getModifiers() & 
~Modifier.FINAL);
-
-        loggerField.setAccessible(true);
-        CapturingLogger capturingLogger = new CapturingLogger((Logger) 
loggerField.get(null));
-        loggerField.set(null, capturingLogger);
-        return capturingLogger;
-    }
-
-    private Tuple<FlowController, Bundle> buildFlowControllerForTest(final 
Map<String, String> addProps) throws Exception {
-        addProps.put(NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS, 
MockProvenanceRepository.class.getName());
-        addProps.put("nifi.remote.input.socket.port", "");
-        addProps.put("nifi.remote.input.secure", "");
-        final NiFiProperties nifiProperties = 
NiFiProperties.createBasicNiFiProperties(null, addProps);
-
-        // build the system bundle
-        final Bundle bundle = SystemBundle.create(nifiProperties);
-        ExtensionManager.discoverExtensions(bundle, Collections.emptySet());
-
-        return new Tuple<>(FlowController.createStandaloneInstance(
-                mock(FlowFileEventRepository.class),
-                nifiProperties,
-                mock(Authorizer.class),
-                mock(AuditService.class),
-                null,
-                null,
-                null), bundle);
-    }
-}

Reply via email to