This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 89b618cc05 NIFI-12772 Expose REMOTE_POLL_BATCH_SIZE property for 
ListSFTP
89b618cc05 is described below

commit 89b618cc052a19bb4bcdbe8f7067ad996aad614d
Author: Tom Brisland <tombrisl...@gmail.com>
AuthorDate: Fri Feb 9 23:13:35 2024 +0000

    NIFI-12772 Expose REMOTE_POLL_BATCH_SIZE property for ListSFTP
    
    This closes #8390.
    
    revert max results optimisation + unnecessary import
    
    Signed-off-by: Joseph Witt <joew...@apache.org>
---
 .../util/file/transfer/ListFileTransfer.java       |  9 +-----
 .../apache/nifi/processors/standard/ListSFTP.java  |  3 +-
 .../nifi/processors/standard/TestListSFTP.java     | 37 +++++++++++++++-------
 3 files changed, 29 insertions(+), 20 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-file-transfer/src/main/java/org/apache/nifi/processor/util/file/transfer/ListFileTransfer.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-file-transfer/src/main/java/org/apache/nifi/processor/util/file/transfer/ListFileTransfer.java
index 1106b27fa3..af10d592d9 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-file-transfer/src/main/java/org/apache/nifi/processor/util/file/transfer/ListFileTransfer.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-file-transfer/src/main/java/org/apache/nifi/processor/util/file/transfer/ListFileTransfer.java
@@ -31,7 +31,6 @@ import java.time.Instant;
 import java.time.ZoneId;
 import java.time.format.DateTimeFormatter;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -124,13 +123,7 @@ public abstract class ListFileTransfer extends 
AbstractListProcessor<FileInfo> {
             return listing;
         }
 
-        final Iterator<FileInfo> itr = listing.iterator();
-        while (itr.hasNext()) {
-            final FileInfo next = itr.next();
-            if (next.getLastModifiedTime() < minTimestamp) {
-                itr.remove();
-            }
-        }
+        listing.removeIf(file -> file.getLastModifiedTime() < minTimestamp);
 
         return listing;
     }
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
index 16a4e8e14e..1d3e498a0c 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
@@ -100,6 +100,7 @@ public class ListSFTP extends ListFileTransfer {
         properties.add(SFTPTransfer.FILE_FILTER_REGEX);
         properties.add(SFTPTransfer.PATH_FILTER_REGEX);
         properties.add(SFTPTransfer.IGNORE_DOTTED_FILES);
+        properties.add(SFTPTransfer.REMOTE_POLL_BATCH_SIZE);
         properties.add(SFTPTransfer.STRICT_HOST_KEY_CHECKING);
         properties.add(SFTPTransfer.HOST_KEY_FILE);
         properties.add(SFTPTransfer.CONNECTION_TIMEOUT);
@@ -176,7 +177,7 @@ public class ListSFTP extends ListFileTransfer {
         final Long maxAge = 
context.getProperty(ListFile.MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
 
         return (attributes) -> {
-            if(attributes.isDirectory()) {
+            if (attributes.isDirectory()) {
                 return true;
             }
 
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListSFTP.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListSFTP.java
index f2340238c1..2ae2b78335 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListSFTP.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListSFTP.java
@@ -16,16 +16,6 @@
  */
 package org.apache.nifi.processors.standard;
 
-import java.io.File;
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.Collections;
-import java.util.List;
-import java.util.UUID;
-
 import org.apache.nifi.components.ConfigVerificationResult;
 import org.apache.nifi.components.ConfigVerificationResult.Outcome;
 import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
@@ -45,6 +35,16 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 public class TestListSFTP {
@@ -94,7 +94,7 @@ public class TestListSFTP {
         
runner.assertAllFlowFilesContainAttribute(ListFile.FILE_PERMISSIONS_ATTRIBUTE);
         
runner.assertAllFlowFilesContainAttribute(ListFile.FILE_SIZE_ATTRIBUTE);
         
runner.assertAllFlowFilesContainAttribute(ListFile.FILE_LAST_MODIFY_TIME_ATTRIBUTE);
-        runner.assertAllFlowFilesContainAttribute( "filename");
+        runner.assertAllFlowFilesContainAttribute("filename");
 
         final MockFlowFile retrievedFile = 
runner.getFlowFilesForRelationship(ListSFTP.REL_SUCCESS).get(0);
         retrievedFile.assertAttributeEquals("sftp.listing.user", 
sshServer.getUsername());
@@ -178,6 +178,21 @@ public class TestListSFTP {
         runner.assertTransferCount(ListSFTP.REL_SUCCESS, 0);
     }
 
+    @Test
+    public void testRemotePollBatchSizeEnforced() {
+        runner.setProperty(AbstractListProcessor.LISTING_STRATEGY, 
AbstractListProcessor.NO_TRACKING);
+        runner.setProperty(SFTPTransfer.REMOTE_POLL_BATCH_SIZE, "1");
+
+        runner.run();
+        // Of 3 items only 1 returned due to batch size
+        runner.assertTransferCount(ListSFTP.REL_SUCCESS, 1);
+
+        runner.setProperty(SFTPTransfer.REMOTE_POLL_BATCH_SIZE, "2");
+
+        runner.run();
+        runner.assertTransferCount(ListSFTP.REL_SUCCESS, 3);
+    }
+
     @Test
     public void testVerificationSuccessful() {
         final List<ConfigVerificationResult> results = ((VerifiableProcessor) 
runner.getProcessor())

Reply via email to