This is an automated email from the ASF dual-hosted git repository. joewitt pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new db7e073368 NIFI-13203: Include s3.url attribute in FetchS3Object, PutS3Object This closes #8795 db7e073368 is described below commit db7e073368eeec2a17da73f74415d64d81e55458 Author: Mark Payne <marka...@hotmail.com> AuthorDate: Thu May 9 17:02:18 2024 -0400 NIFI-13203: Include s3.url attribute in FetchS3Object, PutS3Object This closes #8795 Signed-off-by: Joseph Witt <joew...@apache.org> --- .../java/org/apache/nifi/processors/aws/s3/FetchS3Object.java | 8 ++++---- .../main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java | 8 ++++---- .../java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java | 9 ++++----- 3 files changed, 12 insertions(+), 13 deletions(-) diff --git a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java index 21f6371c61..7d5ae411f3 100644 --- a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java +++ b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java @@ -70,6 +70,7 @@ import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION; @Tags({"Amazon", "S3", "AWS", "Get", "Fetch"}) @CapabilityDescription("Retrieves the contents of an S3 Object and writes it to the content of a FlowFile") @WritesAttributes({ + @WritesAttribute(attribute = "s3.url", description = "The URL that can be used to access the S3 object"), @WritesAttribute(attribute = "s3.bucket", description = "The name of the S3 bucket"), @WritesAttribute(attribute = "path", description = "The path of the file"), @WritesAttribute(attribute = "absolute.path", description = "The path of the file"), @@ -439,12 +440,11 @@ public class FetchS3Object extends AbstractS3Processor { throw ffae; } - if (!attributes.isEmpty()) { - flowFile = session.putAllAttributes(flowFile, attributes); - } + final String url = client.getResourceUrl(bucket, key); + attributes.put("s3.url", url); + flowFile = session.putAllAttributes(flowFile, attributes); session.transfer(flowFile, REL_SUCCESS); - final String url = client.getResourceUrl(bucket, key); final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); getLogger().info("Successfully retrieved S3 Object for {} in {} millis; routing to success", new Object[]{flowFile, transferMillis}); session.getProvenanceReporter().fetch(flowFile, url, transferMillis); diff --git a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java index 9237f77774..80c3645f7d 100644 --- a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java +++ b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java @@ -105,6 +105,7 @@ import static org.apache.nifi.processors.transfer.ResourceTransferUtils.getFileR expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) @ReadsAttribute(attribute = "filename", description = "Uses the FlowFile's filename as the filename for the S3 object") @WritesAttributes({ + @WritesAttribute(attribute = "s3.url", description = "The URL that can be used to access the S3 object"), @WritesAttribute(attribute = "s3.bucket", description = "The S3 bucket where the Object was put in S3"), @WritesAttribute(attribute = "s3.key", description = "The S3 key within where the Object was put in S3"), @WritesAttribute(attribute = "s3.contenttype", description = "The S3 content type of the S3 Object that put in S3"), @@ -857,12 +858,11 @@ public class PutS3Object extends AbstractS3Processor { throw e; } - if (!attributes.isEmpty()) { - flowFile = session.putAllAttributes(flowFile, attributes); - } + final String url = s3.getResourceUrl(bucket, key); + attributes.put("s3.url", url); + flowFile = session.putAllAttributes(flowFile, attributes); session.transfer(flowFile, REL_SUCCESS); - final String url = s3.getResourceUrl(bucket, key); final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); session.getProvenanceReporter().send(flowFile, url, millis); diff --git a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java index 7b983a23aa..bc1d533ea1 100644 --- a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java +++ b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java @@ -449,9 +449,9 @@ public class ITPutS3Object extends AbstractS3IT { runner.setProperty(PutS3Object.BUCKET_WITHOUT_DEFAULT_VALUE, BUCKET_NAME); runner.setProperty(PutS3Object.KEY, "${filename}"); - Map<String, String> attribs = new HashMap<>(); - attribs.put(CoreAttributes.FILENAME.key(), PROV1_FILE); - runner.enqueue("prov1 contents".getBytes(), attribs); + Map<String, String> attributes = new HashMap<>(); + attributes.put(CoreAttributes.FILENAME.key(), PROV1_FILE); + runner.enqueue("prov1 contents".getBytes(), attributes); runner.assertValid(); runner.run(); @@ -461,12 +461,11 @@ public class ITPutS3Object extends AbstractS3IT { final List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents(); assertEquals(1, provenanceEvents.size()); - ProvenanceEventRecord provRec1 = provenanceEvents.get(0); + ProvenanceEventRecord provRec1 = provenanceEvents.getFirst(); assertEquals(ProvenanceEventType.SEND, provRec1.getEventType()); assertEquals(runner.getProcessor().getIdentifier(), provRec1.getComponentId()); String targetUri = getClient().getUrl(BUCKET_NAME, PROV1_FILE).toString(); assertEquals(targetUri, provRec1.getTransitUri()); - assertEquals(8, provRec1.getUpdatedAttributes().size()); assertEquals(BUCKET_NAME, provRec1.getUpdatedAttributes().get(PutS3Object.S3_BUCKET_KEY)); }