This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 62487f575a NIFI-11891 Added No Tracking listing strategy to ListGCS
62487f575a is described below
commit 62487f575ab89328d37b8486bb00b7109023289e
Author: Pierre Villard <[email protected]>
AuthorDate: Thu Aug 3 16:25:07 2023 +0200
NIFI-11891 Added No Tracking listing strategy to ListGCS
This closes #7570
Signed-off-by: David Handermann <[email protected]>
---
.../nifi/processors/gcp/storage/ListGCSBucket.java | 82 ++++++++++++++++++++--
.../processors/gcp/storage/ListGCSBucketTest.java | 49 +++++++++++++
2 files changed, 124 insertions(+), 7 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
index 7a5eecb22e..1b156aac71 100644
---
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
+++
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
@@ -182,12 +182,17 @@ public class ListGCSBucket extends AbstractGCSProcessor {
" However an additional DistributedMapCache controller service is
required and more JVM heap memory is used." +
" For more information on how the 'Entity Tracking Time Window'
property works, see the description.");
+ public static final AllowableValue NO_TRACKING = new
AllowableValue("none", "No Tracking",
+ "This strategy lists all entities without any tracking. The same
entities will be listed each time" +
+ " this processor is scheduled. It is recommended to change
the default run schedule value." +
+ " Any property that relates to the persisting state will
be ignored.");
+
public static final PropertyDescriptor LISTING_STRATEGY = new
PropertyDescriptor.Builder()
.name("listing-strategy")
.displayName("Listing Strategy")
.description("Specify how to determine new/updated entities. See each
strategy descriptions for detail.")
.required(true)
- .allowableValues(BY_TIMESTAMPS, BY_ENTITIES)
+ .allowableValues(BY_TIMESTAMPS, BY_ENTITIES, NO_TRACKING)
.defaultValue(BY_TIMESTAMPS.getValue())
.build();
@@ -291,7 +296,7 @@ public class ListGCSBucket extends AbstractGCSProcessor {
try {
listedEntityTracker.clearListedEntities();
} catch (IOException e) {
- throw new RuntimeException("Failed to reset previously listed
entities due to " + e, e);
+ throw new RuntimeException("Failed to reset previously listed
entities", e);
}
}
resetEntityTrackingState = false;
@@ -396,11 +401,31 @@ public class ListGCSBucket extends AbstractGCSProcessor {
listByTrackingTimestamps(context, session);
} else if (BY_ENTITIES.equals(listingStrategy)) {
listByTrackingEntities(context, session);
+ } else if (NO_TRACKING.equals(listingStrategy)) {
+ listNoTracking(context, session);
} else {
throw new ProcessException("Unknown listing strategy: " +
listingStrategy);
}
}
+ private void listNoTracking(ProcessContext context, ProcessSession
session) {
+ final long startNanos = System.nanoTime();
+ final ListingAction listingAction = new
NoTrackingListingAction(context, session);
+
+ try {
+ listBucket(context, listingAction);
+ } catch (final Exception e) {
+ getLogger().error("Failed to list contents of GCS Bucket", e);
+ listingAction.getBlobWriter().finishListingExceptionally(e);
+ session.rollback();
+ context.yield();
+ return;
+ }
+
+ final long listMillis =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+ getLogger().info("Successfully listed GCS bucket {} in {} millis",
context.getProperty(BUCKET).evaluateAttributeExpressions().getValue(),
listMillis);
+ }
+
private void listByTrackingTimestamps(ProcessContext context,
ProcessSession session) {
try {
restoreState(session);
@@ -416,7 +441,7 @@ public class ListGCSBucket extends AbstractGCSProcessor {
try {
listBucket(context, listingAction);
} catch (final Exception e) {
- getLogger().error("Failed to list contents of GCS Bucket due to
{}", new Object[] {e}, e);
+ getLogger().error("Failed to list contents of GCS Bucket", e);
listingAction.getBlobWriter().finishListingExceptionally(e);
session.rollback();
context.yield();
@@ -424,7 +449,7 @@ public class ListGCSBucket extends AbstractGCSProcessor {
}
final long listMillis =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
- getLogger().info("Successfully listed GCS bucket {} in {} millis", new
Object[]{
context.getProperty(BUCKET).evaluateAttributeExpressions().getValue(),
listMillis });
+ getLogger().info("Successfully listed GCS bucket {} in {} millis",
context.getProperty(BUCKET).evaluateAttributeExpressions().getValue(),
listMillis);
}
private void listBucket(final ProcessContext context, final ListingAction
listingAction) throws IOException, SchemaNotFoundException {
@@ -524,7 +549,7 @@ public class ListGCSBucket extends AbstractGCSProcessor {
private void commit(final ProcessSession session, final int listCount) {
if (listCount > 0) {
- getLogger().info("Successfully listed {} new files from GCS;
routing to success", new Object[] {listCount});
+ getLogger().info("Successfully listed {} new files from GCS;
routing to success", listCount);
session.commitAsync();
}
}
@@ -541,6 +566,49 @@ public class ListGCSBucket extends AbstractGCSProcessor {
void commit(int listCount);
}
+ private class NoTrackingListingAction implements ListingAction<BlobWriter>
{
+ final ProcessContext context;
+ final ProcessSession session;
+ final BlobWriter blobWriter;
+
+ private NoTrackingListingAction(final ProcessContext context, final
ProcessSession session) {
+ this.context = context;
+ this.session = session;
+
+ final RecordSetWriterFactory writerFactory =
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+ if (writerFactory == null) {
+ blobWriter = new AttributeBlobWriter(session);
+ } else {
+ blobWriter = new RecordBlobWriter(session, writerFactory,
getLogger());
+ }
+ }
+
+ @Override
+ public boolean skipBlob(final Blob blob) {
+ return false;
+ }
+
+ @Override
+ public void commit(final int listCount) {
+ ListGCSBucket.this.commit(session, listCount);
+ }
+
+ @Override
+ public BlobWriter getBlobWriter() {
+ return blobWriter;
+ }
+
+ @Override
+ public Storage getCloudService() {
+ return ListGCSBucket.this.getCloudService();
+ }
+
+ @Override
+ public void finishListing(final int listCount, final long
maxTimestamp, final Set<String> keysMatchingTimestamp) {
+ // nothing to do
+ }
+ }
+
private class TriggerListingAction implements ListingAction<BlobWriter> {
final ProcessContext context;
final ProcessSession session;
@@ -645,7 +713,7 @@ public class ListGCSBucket extends AbstractGCSProcessor {
writer.finishListing();
} catch (final Exception e) {
- getLogger().error("Failed to list contents of bucket due to
{}", new Object[] {e}, e);
+ getLogger().error("Failed to list contents of bucket", e);
writer.finishListingExceptionally(e);
session.rollback();
context.yield();
@@ -827,7 +895,7 @@ public class ListGCSBucket extends AbstractGCSProcessor {
try {
recordWriter.close();
} catch (IOException e) {
- logger.error("Failed to write listing as Records due to {}",
new Object[] {e}, e);
+ logger.error("Failed to write listing as Records", e);
}
session.remove(flowFile);
diff --git
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java
index 5b997342e1..c1c4b2a8c6 100644
---
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java
+++
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java
@@ -315,6 +315,55 @@ public class ListGCSBucketTest extends AbstractGCSTest {
assertEquals(Collections.singleton("blob-key-2"),
processor.getStateKeys());
}
+ @Test
+ public void testNoTrackingListing() throws Exception {
+ reset(storage, mockBlobPage);
+ final ListGCSBucket processor = getProcessor();
+ final TestRunner runner = buildNewRunner(processor);
+ addRequiredPropertiesToRunner(runner);
+ runner.setProperty(ListGCSBucket.LISTING_STRATEGY,
ListGCSBucket.NO_TRACKING);
+ runner.assertValid();
+
+ final Iterable<Blob> mockList = Arrays.asList(
+ buildMockBlob("blob-bucket-1", "blob-key-1", 2L),
+ buildMockBlob("blob-bucket-2", "blob-key-2", 3L)
+ );
+
+ when(mockBlobPage.getValues()).thenReturn(mockList);
+ when(mockBlobPage.getNextPage()).thenReturn(null);
+ when(storage.list(anyString(),
any(Storage.BlobListOption[].class))).thenReturn(mockBlobPage);
+
+ runner.run();
+
+ when(storage.testIamPermissions(anyString(),
any())).thenReturn(Collections.singletonList(true));
+
+ runner.assertAllFlowFilesTransferred(ListGCSBucket.REL_SUCCESS);
+ runner.assertTransferCount(ListGCSBucket.REL_SUCCESS, 2);
+ verifyConfigVerification(runner, processor, 2);
+
+ final List<MockFlowFile> successes =
runner.getFlowFilesForRelationship(ListGCSBucket.REL_SUCCESS);
+
+ MockFlowFile flowFile = successes.get(0);
+ assertEquals("blob-bucket-1", flowFile.getAttribute(BUCKET_ATTR));
+ assertEquals("blob-key-1", flowFile.getAttribute(KEY_ATTR));
+ assertEquals("2", flowFile.getAttribute(UPDATE_TIME_ATTR));
+
+ flowFile = successes.get(1);
+ assertEquals("blob-bucket-2", flowFile.getAttribute(BUCKET_ATTR));
+ assertEquals("blob-key-2",flowFile.getAttribute(KEY_ATTR));
+ assertEquals("3", flowFile.getAttribute(UPDATE_TIME_ATTR));
+
+ runner.clearTransferState();
+
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ListGCSBucket.REL_SUCCESS);
+ runner.assertTransferCount(ListGCSBucket.REL_SUCCESS, 2);
+
+ assertEquals(0, processor.getStateTimestamp());
+ assertEquals(0, processor.getStateKeys().size());
+ }
+
@Test
public void testOldValues() throws Exception {
reset(storage, mockBlobPage);