This is an automated email from the ASF dual-hosted git repository. dannycranmer pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 3eca2ba [FLINK-26064][connector/firehose][connector/kinesis] Using separate event loop groups for firehose and kinesis IT tests. 3eca2ba is described below commit 3eca2ba3abb9a02109fe3d21066ea23eb869d6ff Author: Ahmed Hamdy <vah...@amazon.com> AuthorDate: Mon Feb 21 10:30:31 2022 +0000 [FLINK-26064][connector/firehose][connector/kinesis] Using separate event loop groups for firehose and kinesis IT tests. --- .../java/org/apache/flink/connector/aws/util/AWSGeneralUtil.java | 9 +++++++-- .../flink/connector/aws/testutils/AWSServicesTestUtils.java | 7 ++++++- .../flink/connectors/kinesis/testutils/KinesaliteContainer.java | 2 ++ .../flink/connector/firehose/sink/KinesisFirehoseSinkITCase.java | 2 -- 4 files changed, 15 insertions(+), 5 deletions(-) diff --git a/flink-connectors/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSGeneralUtil.java b/flink-connectors/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSGeneralUtil.java index 989f56e..dfc78d8 100644 --- a/flink-connectors/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSGeneralUtil.java +++ b/flink-connectors/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSGeneralUtil.java @@ -227,6 +227,12 @@ public class AWSGeneralUtil { } public static SdkAsyncHttpClient createAsyncHttpClient(final Properties configProperties) { + return createAsyncHttpClient(configProperties, NettyNioAsyncHttpClient.builder()); + } + + public static SdkAsyncHttpClient createAsyncHttpClient( + final Properties configProperties, + final NettyNioAsyncHttpClient.Builder httpClientBuilder) { final AttributeMap.Builder clientConfiguration = AttributeMap.builder().put(SdkHttpConfigurationOption.TCP_KEEPALIVE, true); @@ -262,8 +268,7 @@ public class AWSGeneralUtil { protocol -> clientConfiguration.put( SdkHttpConfigurationOption.PROTOCOL, protocol)); - return createAsyncHttpClient( - clientConfiguration.build(), NettyNioAsyncHttpClient.builder()); + return createAsyncHttpClient(clientConfiguration.build(), httpClientBuilder); } public static SdkAsyncHttpClient createAsyncHttpClient( diff --git a/flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java b/flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java index 79fbe70..e0b53fd 100644 --- a/flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java +++ b/flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java @@ -26,6 +26,8 @@ import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.core.ResponseBytes; import software.amazon.awssdk.core.async.AsyncResponseTransformer; import software.amazon.awssdk.http.async.SdkAsyncHttpClient; +import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; +import software.amazon.awssdk.http.nio.netty.SdkEventLoopGroup; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.iam.IamAsyncClient; import software.amazon.awssdk.services.iam.model.CreateRoleRequest; @@ -96,7 +98,10 @@ public class AWSServicesTestUtils { } public static SdkAsyncHttpClient createHttpClient(String endpoint) { - return AWSGeneralUtil.createAsyncHttpClient(createConfig(endpoint)); + return AWSGeneralUtil.createAsyncHttpClient( + createConfig(endpoint), + NettyNioAsyncHttpClient.builder() + .eventLoopGroupBuilder(SdkEventLoopGroup.builder())); } public static void createBucket(S3AsyncClient s3Client, String bucketName) diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connectors/kinesis/testutils/KinesaliteContainer.java b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connectors/kinesis/testutils/KinesaliteContainer.java index 934e6bc..f76555b 100644 --- a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connectors/kinesis/testutils/KinesaliteContainer.java +++ b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connectors/kinesis/testutils/KinesaliteContainer.java @@ -30,6 +30,7 @@ import software.amazon.awssdk.core.SdkSystemSetting; import software.amazon.awssdk.http.SdkHttpConfigurationOption; import software.amazon.awssdk.http.async.SdkAsyncHttpClient; import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; +import software.amazon.awssdk.http.nio.netty.SdkEventLoopGroup; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.ListStreamsResponse; @@ -176,6 +177,7 @@ public class KinesaliteContainer extends GenericContainer<KinesaliteContainer> { private SdkAsyncHttpClient buildSdkAsyncHttpClient() { return NettyNioAsyncHttpClient.builder() + .eventLoopGroupBuilder(SdkEventLoopGroup.builder()) .buildWithDefaults( AttributeMap.builder() .put(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, true) diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkITCase.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkITCase.java index 5170470..0e37f15 100644 --- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkITCase.java +++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkITCase.java @@ -30,7 +30,6 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMap import org.junit.After; import org.junit.Before; import org.junit.ClassRule; -import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,7 +57,6 @@ import static org.apache.flink.connector.firehose.sink.testutils.KinesisFirehose import static org.assertj.core.api.Assertions.assertThat; /** Integration test suite for the {@code KinesisFirehoseSink} using a localstack container. */ -@Ignore("FLINK-26064") public class KinesisFirehoseSinkITCase { private static final Logger LOG = LoggerFactory.getLogger(KinesisFirehoseSinkITCase.class);