dmvk commented on a change in pull request #18686: URL: https://github.com/apache/flink/pull/18686#discussion_r803419819
########## File path: flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connectors/kinesis/testutils/KinesaliteContainer.java ########## @@ -99,25 +99,41 @@ public Properties getHostProperties() { } /** Returns the client to access the container from outside the docker network. */ - public KinesisAsyncClient getContainerClient() throws URISyntaxException { - return getClient(getContainerEndpointUrl()); + public KinesisAsyncClient getContainerClient(SdkAsyncHttpClient httpClient) + throws URISyntaxException { + return getClient(getContainerEndpointUrl(), httpClient); } - /** Returns the client to access the host from inside the docker network. */ + /** Returns the client with the default async http client. */ public KinesisAsyncClient getHostClient() throws URISyntaxException { - return getClient(getHostEndpointUrl()); + return getHostClient(buildSdkAsyncHttpClient()); Review comment: How you intend to close the http client? From what I've seen with firehose the wrapping client doesn't seem to close the underlying resource. ########## File path: flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connectors/kinesis/testutils/KinesaliteContainer.java ########## @@ -99,25 +99,41 @@ public Properties getHostProperties() { } /** Returns the client to access the container from outside the docker network. */ - public KinesisAsyncClient getContainerClient() throws URISyntaxException { - return getClient(getContainerEndpointUrl()); + public KinesisAsyncClient getContainerClient(SdkAsyncHttpClient httpClient) + throws URISyntaxException { + return getClient(getContainerEndpointUrl(), httpClient); } - /** Returns the client to access the host from inside the docker network. */ + /** Returns the client with the default async http client. */ public KinesisAsyncClient getHostClient() throws URISyntaxException { - return getClient(getHostEndpointUrl()); + return getHostClient(buildSdkAsyncHttpClient()); } - public KinesisAsyncClient getClient(String endPoint) throws URISyntaxException { + /** Returns the client to access the host from inside the docker network. */ + public KinesisAsyncClient getHostClient(SdkAsyncHttpClient httpClient) + throws URISyntaxException { + return getClient(getHostEndpointUrl(), httpClient); + } + + public KinesisAsyncClient getClient(String endPoint, SdkAsyncHttpClient httpClient) + throws URISyntaxException { return KinesisAsyncClient.builder() .endpointOverride(new URI(endPoint)) .region(REGION) .credentialsProvider( () -> AwsBasicCredentials.create(getAccessKey(), getSecretKey())) - .httpClient(buildSdkAsyncHttpClient()) + .httpClient(httpClient) .build(); } + public SdkAsyncHttpClient buildSdkAsyncHttpClient() { Review comment: There is so much duplicated code already, why cant we reuse org.apache.flink.connector.aws.testutils.AWSServicesTestUtils#createHttpClient here? ########## File path: flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connectors/kinesis/testutils/KinesaliteContainer.java ########## @@ -99,25 +99,41 @@ public Properties getHostProperties() { } /** Returns the client to access the container from outside the docker network. */ - public KinesisAsyncClient getContainerClient() throws URISyntaxException { - return getClient(getContainerEndpointUrl()); + public KinesisAsyncClient getContainerClient(SdkAsyncHttpClient httpClient) Review comment: I'd suggest renaming the methods to start with a `createXXX` prefix, so it's explicit that these need to be closed and EVERY SINGLE CALL to them creates a new resource. ########## File path: flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkITCase.java ########## @@ -84,12 +86,15 @@ public void setUp() throws Exception { env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); - kinesisClient = KINESALITE.getHostClient(); + httpClient = KINESALITE.buildSdkAsyncHttpClient(); + kinesisClient = KINESALITE.getHostClient(httpClient); } @After public void teardown() { System.clearProperty(SdkSystemSetting.CBOR_ENABLED.property()); + httpClient.close(); + kinesisClient.close(); Review comment: -> `org.apache.flink.connector.aws.util.AWSGeneralUtil#closeResources` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org