This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 62487f575a NIFI-11891 Added No Tracking listing strategy to ListGCS
62487f575a is described below

commit 62487f575ab89328d37b8486bb00b7109023289e
Author: Pierre Villard <[email protected]>
AuthorDate: Thu Aug 3 16:25:07 2023 +0200

    NIFI-11891 Added No Tracking listing strategy to ListGCS
    
    This closes #7570
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../nifi/processors/gcp/storage/ListGCSBucket.java | 82 ++++++++++++++++++++--
 .../processors/gcp/storage/ListGCSBucketTest.java  | 49 +++++++++++++
 2 files changed, 124 insertions(+), 7 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
index 7a5eecb22e..1b156aac71 100644
--- 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
@@ -182,12 +182,17 @@ public class ListGCSBucket extends AbstractGCSProcessor {
             " However an additional DistributedMapCache controller service is 
required and more JVM heap memory is used." +
             " For more information on how the 'Entity Tracking Time Window' 
property works, see the description.");
 
+    public static final AllowableValue NO_TRACKING = new 
AllowableValue("none", "No Tracking",
+            "This strategy lists all entities without any tracking. The same 
entities will be listed each time" +
+                    " this processor is scheduled. It is recommended to change 
the default run schedule value." +
+                    " Any property that relates to the persisting state will 
be ignored.");
+
     public static final PropertyDescriptor LISTING_STRATEGY = new 
PropertyDescriptor.Builder()
         .name("listing-strategy")
         .displayName("Listing Strategy")
         .description("Specify how to determine new/updated entities. See each 
strategy descriptions for detail.")
         .required(true)
-        .allowableValues(BY_TIMESTAMPS, BY_ENTITIES)
+        .allowableValues(BY_TIMESTAMPS, BY_ENTITIES, NO_TRACKING)
         .defaultValue(BY_TIMESTAMPS.getValue())
         .build();
 
@@ -291,7 +296,7 @@ public class ListGCSBucket extends AbstractGCSProcessor {
             try {
                 listedEntityTracker.clearListedEntities();
             } catch (IOException e) {
-                throw new RuntimeException("Failed to reset previously listed 
entities due to " + e, e);
+                throw new RuntimeException("Failed to reset previously listed 
entities", e);
             }
         }
         resetEntityTrackingState = false;
@@ -396,11 +401,31 @@ public class ListGCSBucket extends AbstractGCSProcessor {
             listByTrackingTimestamps(context, session);
         } else if (BY_ENTITIES.equals(listingStrategy)) {
             listByTrackingEntities(context, session);
+        } else if (NO_TRACKING.equals(listingStrategy)) {
+            listNoTracking(context, session);
         } else {
             throw new ProcessException("Unknown listing strategy: " + 
listingStrategy);
         }
     }
 
+    private void listNoTracking(ProcessContext context, ProcessSession 
session) {
+        final long startNanos = System.nanoTime();
+        final ListingAction listingAction = new 
NoTrackingListingAction(context, session);
+
+        try {
+            listBucket(context, listingAction);
+        } catch (final Exception e) {
+            getLogger().error("Failed to list contents of GCS Bucket", e);
+            listingAction.getBlobWriter().finishListingExceptionally(e);
+            session.rollback();
+            context.yield();
+            return;
+        }
+
+        final long listMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+        getLogger().info("Successfully listed GCS bucket {} in {} millis", 
context.getProperty(BUCKET).evaluateAttributeExpressions().getValue(), 
listMillis);
+    }
+
     private void listByTrackingTimestamps(ProcessContext context, 
ProcessSession session) {
         try {
             restoreState(session);
@@ -416,7 +441,7 @@ public class ListGCSBucket extends AbstractGCSProcessor {
         try {
             listBucket(context, listingAction);
         } catch (final Exception e) {
-            getLogger().error("Failed to list contents of GCS Bucket due to 
{}", new Object[] {e}, e);
+            getLogger().error("Failed to list contents of GCS Bucket", e);
             listingAction.getBlobWriter().finishListingExceptionally(e);
             session.rollback();
             context.yield();
@@ -424,7 +449,7 @@ public class ListGCSBucket extends AbstractGCSProcessor {
         }
 
         final long listMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
-        getLogger().info("Successfully listed GCS bucket {} in {} millis", new 
Object[]{ 
context.getProperty(BUCKET).evaluateAttributeExpressions().getValue(), 
listMillis });
+        getLogger().info("Successfully listed GCS bucket {} in {} millis", 
context.getProperty(BUCKET).evaluateAttributeExpressions().getValue(), 
listMillis);
     }
 
     private void listBucket(final ProcessContext context, final ListingAction 
listingAction) throws IOException, SchemaNotFoundException {
@@ -524,7 +549,7 @@ public class ListGCSBucket extends AbstractGCSProcessor {
 
     private void commit(final ProcessSession session, final int listCount) {
         if (listCount > 0) {
-            getLogger().info("Successfully listed {} new files from GCS; 
routing to success", new Object[] {listCount});
+            getLogger().info("Successfully listed {} new files from GCS; 
routing to success", listCount);
             session.commitAsync();
         }
     }
@@ -541,6 +566,49 @@ public class ListGCSBucket extends AbstractGCSProcessor {
         void commit(int listCount);
     }
 
+    private class NoTrackingListingAction implements ListingAction<BlobWriter> 
{
+        final ProcessContext context;
+        final ProcessSession session;
+        final BlobWriter blobWriter;
+
+        private NoTrackingListingAction(final ProcessContext context, final 
ProcessSession session) {
+            this.context = context;
+            this.session = session;
+
+            final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+            if (writerFactory == null) {
+                blobWriter = new AttributeBlobWriter(session);
+            } else {
+                blobWriter = new RecordBlobWriter(session, writerFactory, 
getLogger());
+            }
+        }
+
+        @Override
+        public boolean skipBlob(final Blob blob) {
+            return false;
+        }
+
+        @Override
+        public void commit(final int listCount) {
+            ListGCSBucket.this.commit(session, listCount);
+        }
+
+        @Override
+        public BlobWriter getBlobWriter() {
+            return blobWriter;
+        }
+
+        @Override
+        public Storage getCloudService() {
+            return ListGCSBucket.this.getCloudService();
+        }
+
+        @Override
+        public void finishListing(final int listCount, final long 
maxTimestamp, final Set<String> keysMatchingTimestamp) {
+            // nothing to do
+        }
+    }
+
     private class TriggerListingAction implements ListingAction<BlobWriter> {
         final ProcessContext context;
         final ProcessSession session;
@@ -645,7 +713,7 @@ public class ListGCSBucket extends AbstractGCSProcessor {
 
                 writer.finishListing();
             } catch (final Exception e) {
-                getLogger().error("Failed to list contents of bucket due to 
{}", new Object[] {e}, e);
+                getLogger().error("Failed to list contents of bucket", e);
                 writer.finishListingExceptionally(e);
                 session.rollback();
                 context.yield();
@@ -827,7 +895,7 @@ public class ListGCSBucket extends AbstractGCSProcessor {
             try {
                 recordWriter.close();
             } catch (IOException e) {
-                logger.error("Failed to write listing as Records due to {}", 
new Object[] {e}, e);
+                logger.error("Failed to write listing as Records", e);
             }
 
             session.remove(flowFile);
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java
index 5b997342e1..c1c4b2a8c6 100644
--- 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java
@@ -315,6 +315,55 @@ public class ListGCSBucketTest extends AbstractGCSTest {
         assertEquals(Collections.singleton("blob-key-2"), 
processor.getStateKeys());
     }
 
+    @Test
+    public void testNoTrackingListing() throws Exception {
+        reset(storage, mockBlobPage);
+        final ListGCSBucket processor = getProcessor();
+        final TestRunner runner = buildNewRunner(processor);
+        addRequiredPropertiesToRunner(runner);
+        runner.setProperty(ListGCSBucket.LISTING_STRATEGY, 
ListGCSBucket.NO_TRACKING);
+        runner.assertValid();
+
+        final Iterable<Blob> mockList = Arrays.asList(
+                buildMockBlob("blob-bucket-1", "blob-key-1", 2L),
+                buildMockBlob("blob-bucket-2", "blob-key-2", 3L)
+        );
+
+        when(mockBlobPage.getValues()).thenReturn(mockList);
+        when(mockBlobPage.getNextPage()).thenReturn(null);
+        when(storage.list(anyString(), 
any(Storage.BlobListOption[].class))).thenReturn(mockBlobPage);
+
+        runner.run();
+
+        when(storage.testIamPermissions(anyString(), 
any())).thenReturn(Collections.singletonList(true));
+
+        runner.assertAllFlowFilesTransferred(ListGCSBucket.REL_SUCCESS);
+        runner.assertTransferCount(ListGCSBucket.REL_SUCCESS, 2);
+        verifyConfigVerification(runner, processor, 2);
+
+        final List<MockFlowFile> successes = 
runner.getFlowFilesForRelationship(ListGCSBucket.REL_SUCCESS);
+
+        MockFlowFile flowFile = successes.get(0);
+        assertEquals("blob-bucket-1", flowFile.getAttribute(BUCKET_ATTR));
+        assertEquals("blob-key-1", flowFile.getAttribute(KEY_ATTR));
+        assertEquals("2", flowFile.getAttribute(UPDATE_TIME_ATTR));
+
+        flowFile = successes.get(1);
+        assertEquals("blob-bucket-2", flowFile.getAttribute(BUCKET_ATTR));
+        assertEquals("blob-key-2",flowFile.getAttribute(KEY_ATTR));
+        assertEquals("3", flowFile.getAttribute(UPDATE_TIME_ATTR));
+
+        runner.clearTransferState();
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ListGCSBucket.REL_SUCCESS);
+        runner.assertTransferCount(ListGCSBucket.REL_SUCCESS, 2);
+
+        assertEquals(0, processor.getStateTimestamp());
+        assertEquals(0, processor.getStateKeys().size());
+    }
+
     @Test
     public void testOldValues() throws Exception {
         reset(storage, mockBlobPage);

Reply via email to