This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new ea48d22fba NIFI-12896 Add Endpoint Override URL property for PutSNS 
Processor
ea48d22fba is described below

commit ea48d22fba666cc5ec70db70ccd4a13da07acff8
Author: sujkim <suejki...@gmail.com>
AuthorDate: Mon Mar 18 15:58:04 2024 +0000

    NIFI-12896 Add Endpoint Override URL property for PutSNS Processor
    
    This closes #8529
    
    Signed-off-by: David Handermann <exceptionfact...@apache.org>
---
 .../org/apache/nifi/processors/aws/sns/PutSNS.java |   1 +
 .../apache/nifi/processors/aws/sns/ITPutSNS.java   | 118 +++++++++++++++++++++
 2 files changed, 119 insertions(+)

diff --git 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java
 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java
index 1450bbc24e..8365203ee7 100644
--- 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java
+++ 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java
@@ -130,6 +130,7 @@ public class PutSNS extends 
AbstractAwsSyncProcessor<SnsClient, SnsClientBuilder
             AWS_CREDENTIALS_PROVIDER_SERVICE,
             SSL_CONTEXT_SERVICE,
             TIMEOUT,
+            ENDPOINT_OVERRIDE,
             USE_JSON_STRUCTURE,
             CHARACTER_ENCODING,
             MESSAGEGROUPID,
diff --git 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sns/ITPutSNS.java
 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sns/ITPutSNS.java
new file mode 100644
index 0000000000..e7d0d2e216
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sns/ITPutSNS.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.sns;
+
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processors.aws.testutil.AuthUtils;
+import org.apache.nifi.processors.aws.v2.AbstractAwsProcessor;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.localstack.LocalStackContainer;
+import org.testcontainers.utility.DockerImageName;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.sns.SnsClient;
+import software.amazon.awssdk.services.sns.model.CreateTopicRequest;
+import software.amazon.awssdk.services.sns.model.CreateTopicResponse;
+
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Provides integration level testing with actual AWS S3 resources for {@link 
PutSNS} and requires additional configuration and resources to work.
+ */
+public class ITPutSNS {
+
+    private static final DockerImageName localstackImage = 
DockerImageName.parse("localstack/localstack:latest");
+
+    private static final LocalStackContainer localstack = new 
LocalStackContainer(localstackImage)
+            .withServices(LocalStackContainer.Service.SNS);
+
+    private static final String CREDENTIALS_FILE = 
"src/test/resources/mock-aws-credentials.properties";
+    private static String topicARN;
+    private static SnsClient client;
+
+    @BeforeAll
+    public static void setup() throws InterruptedException {
+        localstack.start();
+
+        client = SnsClient.builder()
+                .endpointOverride(localstack.getEndpoint())
+                .credentialsProvider(
+                        StaticCredentialsProvider.create(
+                                
AwsBasicCredentials.create(localstack.getAccessKey(), localstack.getSecretKey())
+                        )
+                )
+                .region(Region.of(localstack.getRegion()))
+                .build();
+
+        final CreateTopicResponse response = 
client.createTopic(CreateTopicRequest.builder()
+                .name("SnsSystemTest")
+                .build());
+        assertTrue(response.sdkHttpResponse().isSuccessful());
+        topicARN = response.topicArn();
+    }
+
+    @AfterAll
+    public static void shutdown() {
+        client.close();
+        localstack.stop();
+    }
+
+    @Test
+    public void testPublish() throws IOException {
+        final TestRunner runner = initRunner(PutSNS.class);
+        AuthUtils.enableAccessKey(runner, localstack.getAccessKey(), 
localstack.getSecretKey());
+        assertTrue(runner.setProperty("DynamicProperty", "hello!").isValid());
+
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "1.txt");
+        runner.enqueue(Paths.get("src/test/resources/hello.txt"), attrs);
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(PutSNS.REL_SUCCESS, 1);
+    }
+
+    @Test
+    public void testPublishWithCredentialsProviderService() throws Throwable {
+        final TestRunner runner = initRunner(PutSNS.class);
+        AuthUtils.enableCredentialsFile(runner, CREDENTIALS_FILE);
+
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "1.txt");
+        runner.enqueue(Paths.get("src/test/resources/hello.txt"), attrs);
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(PutSNS.REL_SUCCESS, 1);
+    }
+
+    protected TestRunner initRunner(final Class<? extends Processor> 
processorClass) {
+        TestRunner runner = TestRunners.newTestRunner(processorClass);
+        runner.setProperty(AbstractAwsProcessor.REGION, 
localstack.getRegion());
+        runner.setProperty(AbstractAwsProcessor.ENDPOINT_OVERRIDE, 
localstack.getEndpointOverride(LocalStackContainer.Service.SNS).toString());
+        runner.setProperty(PutSNS.ARN, topicARN);
+        return runner;
+    }
+}
\ No newline at end of file

Reply via email to