This is an automated email from the ASF dual-hosted git repository.

hong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git


The following commit(s) were added to refs/heads/main by this push:
     new b681cfc  [FLINK-31986][Connectors/Kinesis] Setup integration tests for 
source
b681cfc is described below

commit b681cfcd372a44a34c4e6c868cb8a69d50aac67c
Author: Elphas Toringepi <etori...@amazon.co.uk>
AuthorDate: Fri Sep 27 17:14:50 2024 +0100

    [FLINK-31986][Connectors/Kinesis] Setup integration tests for source
---
 .../flink/connector/aws/util/AWSGeneralUtil.java   |  21 +-
 .../aws/testutils/AWSServicesTestUtils.java        |   6 +-
 .../connector/aws/util/AWSClientUtilTest.java      |  20 ++
 .../flink-connector-aws-kinesis-streams/pom.xml    |   6 +
 .../kinesis/source/KinesisStreamsSource.java       |   9 +-
 .../kinesis/source/KinesisStreamsSourceITCase.java | 220 +++++++++++++++++++++
 6 files changed, 272 insertions(+), 10 deletions(-)

diff --git 
a/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSGeneralUtil.java
 
b/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSGeneralUtil.java
index cea8248..5325b43 100644
--- 
a/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSGeneralUtil.java
+++ 
b/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSGeneralUtil.java
@@ -297,9 +297,8 @@ public class AWSGeneralUtil {
         return createAsyncHttpClient(configProperties, 
NettyNioAsyncHttpClient.builder());
     }
 
-    public static SdkAsyncHttpClient createAsyncHttpClient(
-            final Properties configProperties,
-            final NettyNioAsyncHttpClient.Builder httpClientBuilder) {
+    @VisibleForTesting
+    static AttributeMap getSdkHttpConfigurationOptions(final Properties 
configProperties) {
         final AttributeMap.Builder clientConfiguration =
                 
AttributeMap.builder().put(SdkHttpConfigurationOption.TCP_KEEPALIVE, true);
 
@@ -335,7 +334,15 @@ public class AWSGeneralUtil {
                         protocol ->
                                 clientConfiguration.put(
                                         SdkHttpConfigurationOption.PROTOCOL, 
protocol));
-        return createAsyncHttpClient(clientConfiguration.build(), 
httpClientBuilder);
+
+        return clientConfiguration.build();
+    }
+
+    public static SdkAsyncHttpClient createAsyncHttpClient(
+            final Properties configProperties,
+            final NettyNioAsyncHttpClient.Builder httpClientBuilder) {
+        return createAsyncHttpClient(
+                getSdkHttpConfigurationOptions(configProperties), 
httpClientBuilder);
     }
 
     public static SdkAsyncHttpClient createAsyncHttpClient(
@@ -355,6 +362,12 @@ public class AWSGeneralUtil {
         return 
httpClientBuilder.buildWithDefaults(config.merge(HTTP_CLIENT_DEFAULTS));
     }
 
+    public static SdkHttpClient createSyncHttpClient(
+            final Properties configProperties, final ApacheHttpClient.Builder 
httpClientBuilder) {
+        return createSyncHttpClient(
+                getSdkHttpConfigurationOptions(configProperties), 
httpClientBuilder);
+    }
+
     public static SdkHttpClient createSyncHttpClient(
             final AttributeMap config, final ApacheHttpClient.Builder 
httpClientBuilder) {
         
httpClientBuilder.connectionAcquisitionTimeout(CONNECTION_ACQUISITION_TIMEOUT);
diff --git 
a/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java
 
b/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java
index 207f567..5bea73b 100644
--- 
a/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java
+++ 
b/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java
@@ -115,7 +115,11 @@ public class AWSServicesTestUtils {
     }
 
     public static void createIAMRole(IamClient iam, String roleName) {
-        CreateRoleRequest request = 
CreateRoleRequest.builder().roleName(roleName).build();
+        CreateRoleRequest request =
+                CreateRoleRequest.builder()
+                        .roleName(roleName)
+                        .assumeRolePolicyDocument("{}")
+                        .build();
 
         iam.createRole(request);
     }
diff --git 
a/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/util/AWSClientUtilTest.java
 
b/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/util/AWSClientUtilTest.java
index 0c2c2a9..67331e3 100644
--- 
a/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/util/AWSClientUtilTest.java
+++ 
b/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/util/AWSClientUtilTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.flink.connector.aws.util;
 
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+
 import org.junit.jupiter.api.Test;
 import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
 import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
@@ -28,7 +30,9 @@ import 
software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
 import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
 import software.amazon.awssdk.core.client.config.SdkClientConfiguration;
 import software.amazon.awssdk.core.client.config.SdkClientOption;
+import software.amazon.awssdk.http.Protocol;
 import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.http.SdkHttpConfigurationOption;
 import software.amazon.awssdk.http.apache.ApacheHttpClient;
 import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
 import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
@@ -50,6 +54,7 @@ import java.util.Properties;
 import static 
org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ENDPOINT;
 import static 
org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_REGION;
 import static 
org.apache.flink.connector.aws.util.AWSClientUtil.formatFlinkUserAgentPrefix;
+import static 
org.apache.flink.connector.aws.util.AWSGeneralUtil.getSdkHttpConfigurationOptions;
 import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.argThat;
@@ -204,6 +209,21 @@ class AWSClientUtilTest {
                 .isTrue();
     }
 
+    @Test
+    void testGetSdkHttpConfigurationOptions() {
+        Properties properties = TestUtil.properties(AWS_REGION, "eu-west-2");
+        properties.setProperty(AWSConfigConstants.TRUST_ALL_CERTIFICATES, 
"true");
+        properties.setProperty(AWSConfigConstants.HTTP_PROTOCOL_VERSION, 
"HTTP1_1");
+        AttributeMap options = getSdkHttpConfigurationOptions(properties);
+
+        
assertThat(options.get(SdkHttpConfigurationOption.TCP_KEEPALIVE).booleanValue()).isTrue();
+        
assertThat(options.containsKey(SdkHttpConfigurationOption.MAX_CONNECTIONS)).isFalse();
+        
assertThat(options.containsKey(SdkHttpConfigurationOption.READ_TIMEOUT)).isFalse();
+        
assertThat(options.get(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES).booleanValue())
+                .isTrue();
+        
assertThat(options.get(SdkHttpConfigurationOption.PROTOCOL)).isEqualTo(Protocol.HTTP1_1);
+    }
+
     @Test
     void testCreateKinesisAsyncClientWithEndpointOverride() {
         Properties properties = TestUtil.properties(AWS_REGION, "eu-west-2");
diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/pom.xml 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/pom.xml
index 639dbbb..dc3f6a1 100644
--- a/flink-connector-aws/flink-connector-aws-kinesis-streams/pom.xml
+++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/pom.xml
@@ -122,6 +122,12 @@ under the License.
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>s3</artifactId>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>com.fasterxml.jackson.datatype</groupId>
             <artifactId>jackson-datatype-jsr310</artifactId>
diff --git 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java
 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java
index 1ba3704..7838357 100644
--- 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java
+++ 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java
@@ -57,7 +57,6 @@ import software.amazon.awssdk.http.apache.ApacheHttpClient;
 import software.amazon.awssdk.retries.api.BackoffStrategy;
 import software.amazon.awssdk.retries.api.RetryStrategy;
 import software.amazon.awssdk.services.kinesis.KinesisClient;
-import software.amazon.awssdk.utils.AttributeMap;
 
 import java.time.Duration;
 import java.util.Map;
@@ -193,10 +192,6 @@ public class KinesisStreamsSource<T>
     }
 
     private KinesisStreamProxy createKinesisStreamProxy(Configuration 
consumerConfig) {
-        SdkHttpClient httpClient =
-                AWSGeneralUtil.createSyncHttpClient(
-                        AttributeMap.builder().build(), 
ApacheHttpClient.builder());
-
         String region =
                 AWSGeneralUtil.getRegionFromArn(streamArn)
                         .orElseThrow(
@@ -219,6 +214,10 @@ public class KinesisStreamsSource<T>
                                                 AWSConfigOptions
                                                         
.RETRY_STRATEGY_MAX_ATTEMPTS_OPTION)));
 
+        SdkHttpClient httpClient =
+                AWSGeneralUtil.createSyncHttpClient(
+                        kinesisClientProperties, ApacheHttpClient.builder());
+
         AWSGeneralUtil.validateAwsCredentials(kinesisClientProperties);
         KinesisClient kinesisClient =
                 AWSClientUtil.createAwsSyncClient(
diff --git 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSourceITCase.java
 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSourceITCase.java
new file mode 100644
index 0000000..4a2adec
--- /dev/null
+++ 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSourceITCase.java
@@ -0,0 +1,220 @@
+package org.apache.flink.connector.kinesis.source;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
+import org.apache.flink.connector.aws.testutils.LocalstackContainer;
+import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+
+import com.google.common.collect.Lists;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.rnorth.ducttape.ratelimits.RateLimiter;
+import org.rnorth.ducttape.ratelimits.RateLimiterBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.core.SdkSystemSetting;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
+import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
+import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
+import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;
+import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
+import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse;
+import software.amazon.awssdk.services.kinesis.model.PutRecordsResultEntry;
+import software.amazon.awssdk.services.kinesis.model.StreamStatus;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static 
org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ACCESS_KEY_ID;
+import static 
org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ENDPOINT;
+import static 
org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_REGION;
+import static 
org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_SECRET_ACCESS_KEY;
+import static 
org.apache.flink.connector.aws.config.AWSConfigConstants.HTTP_PROTOCOL_VERSION;
+import static 
org.apache.flink.connector.aws.config.AWSConfigConstants.TRUST_ALL_CERTIFICATES;
+
+/** IT cases for using {@code KinesisStreamsSource} using a localstack 
container. */
+@Testcontainers
+@ExtendWith(MiniClusterExtension.class)
+public class KinesisStreamsSourceITCase {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KinesisStreamsSourceITCase.class);
+    private static final String LOCALSTACK_DOCKER_IMAGE_VERSION = 
"localstack/localstack:3.7.2";
+
+    @Container
+    private static final LocalstackContainer MOCK_KINESIS_CONTAINER =
+            new 
LocalstackContainer(DockerImageName.parse(LOCALSTACK_DOCKER_IMAGE_VERSION));
+
+    private StreamExecutionEnvironment env;
+    private SdkHttpClient httpClient;
+    private KinesisClient kinesisClient;
+
+    @BeforeEach
+    void setUp() {
+        System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false");
+
+        env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+
+        httpClient = AWSServicesTestUtils.createHttpClient();
+        kinesisClient =
+                AWSServicesTestUtils.createAwsSyncClient(
+                        MOCK_KINESIS_CONTAINER.getEndpoint(), httpClient, 
KinesisClient.builder());
+    }
+
+    @AfterEach
+    void teardown() {
+        System.clearProperty(SdkSystemSetting.CBOR_ENABLED.property());
+        AWSGeneralUtil.closeResources(httpClient, kinesisClient);
+    }
+
+    @Test
+    void nonExistentStreamShouldResultInFailure() {
+        Assertions.assertThatExceptionOfType(RuntimeException.class)
+                .isThrownBy(
+                        () ->
+                                new Scenario()
+                                        .localstackStreamName("stream-exists")
+                                        .withSourceConnectionStreamArn(
+                                                
"arn:aws:kinesis:ap-southeast-1:000000000000:stream/stream-not-exists")
+                                        .runScenario())
+                .withStackTraceContaining(
+                        "Stream arn 
arn:aws:kinesis:ap-southeast-1:000000000000:stream/stream-not-exists not 
found");
+    }
+
+    @Test
+    void validStreamIsConsumed() throws Exception {
+        new Scenario()
+                .localstackStreamName("valid-stream")
+                .withSourceConnectionStreamArn(
+                        
"arn:aws:kinesis:ap-southeast-1:000000000000:stream/valid-stream")
+                .runScenario();
+    }
+
+    private Configuration getDefaultConfiguration() {
+        Configuration configuration = new Configuration();
+        configuration.setString(AWS_ENDPOINT, 
MOCK_KINESIS_CONTAINER.getEndpoint());
+        configuration.setString(AWS_ACCESS_KEY_ID, "accessKeyId");
+        configuration.setString(AWS_SECRET_ACCESS_KEY, "secretAccessKey");
+        configuration.setString(AWS_REGION, Region.AP_SOUTHEAST_1.toString());
+        configuration.setString(TRUST_ALL_CERTIFICATES, "true");
+        configuration.setString(HTTP_PROTOCOL_VERSION, "HTTP1_1");
+        return configuration;
+    }
+
+    private class Scenario {
+        private final int expectedElements = 1000;
+        private String localstackStreamName = null;
+        private String sourceConnectionStreamArn;
+        private final Configuration configuration =
+                KinesisStreamsSourceITCase.this.getDefaultConfiguration();
+
+        public void runScenario() throws Exception {
+            if (localstackStreamName != null) {
+                prepareStream(localstackStreamName);
+            }
+
+            putRecords(localstackStreamName, expectedElements);
+
+            KinesisStreamsSource<String> kdsSource =
+                    KinesisStreamsSource.<String>builder()
+                            .setStreamArn(sourceConnectionStreamArn)
+                            .setSourceConfig(configuration)
+                            .setDeserializationSchema(new SimpleStringSchema())
+                            .build();
+
+            List<String> result =
+                    env.fromSource(kdsSource, 
WatermarkStrategy.noWatermarks(), "Kinesis source")
+                            .returns(TypeInformation.of(String.class))
+                            .executeAndCollect(expectedElements);
+
+            Assertions.assertThat(result.size()).isEqualTo(expectedElements);
+        }
+
+        public Scenario withSourceConnectionStreamArn(String 
sourceConnectionStreamArn) {
+            this.sourceConnectionStreamArn = sourceConnectionStreamArn;
+            return this;
+        }
+
+        public Scenario localstackStreamName(String localstackStreamName) {
+            this.localstackStreamName = localstackStreamName;
+            return this;
+        }
+
+        private void prepareStream(String streamName) throws Exception {
+            final RateLimiter rateLimiter =
+                    RateLimiterBuilder.newBuilder()
+                            .withRate(1, SECONDS)
+                            .withConstantThroughput()
+                            .build();
+
+            kinesisClient.createStream(
+                    
CreateStreamRequest.builder().streamName(streamName).shardCount(1).build());
+
+            Deadline deadline = Deadline.fromNow(Duration.ofMinutes(1));
+            while (!rateLimiter.getWhenReady(() -> streamExists(streamName))) {
+                if (deadline.isOverdue()) {
+                    throw new RuntimeException("Failed to create stream within 
time");
+                }
+            }
+        }
+
+        private void putRecords(String streamName, int numRecords) {
+            List<byte[]> messages =
+                    IntStream.range(0, numRecords)
+                            .mapToObj(String::valueOf)
+                            .map(String::getBytes)
+                            .collect(Collectors.toList());
+
+            for (List<byte[]> partition : Lists.partition(messages, 500)) {
+                List<PutRecordsRequestEntry> entries =
+                        partition.stream()
+                                .map(
+                                        msg ->
+                                                
PutRecordsRequestEntry.builder()
+                                                        
.partitionKey("fakePartitionKey")
+                                                        
.data(SdkBytes.fromByteArray(msg))
+                                                        .build())
+                                .collect(Collectors.toList());
+                PutRecordsRequest requests =
+                        
PutRecordsRequest.builder().streamName(streamName).records(entries).build();
+                PutRecordsResponse putRecordResult = 
kinesisClient.putRecords(requests);
+                for (PutRecordsResultEntry result : putRecordResult.records()) 
{
+                    LOG.debug("Added record: {}", result.sequenceNumber());
+                }
+            }
+        }
+
+        private boolean streamExists(final String streamName) {
+            try {
+                return kinesisClient
+                                .describeStream(
+                                        DescribeStreamRequest.builder()
+                                                .streamName(streamName)
+                                                .build())
+                                .streamDescription()
+                                .streamStatus()
+                        == StreamStatus.ACTIVE;
+            } catch (Exception e) {
+                return false;
+            }
+        }
+    }
+}

Reply via email to