dmvk commented on a change in pull request #18686:
URL: https://github.com/apache/flink/pull/18686#discussion_r803419819



##########
File path: 
flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connectors/kinesis/testutils/KinesaliteContainer.java
##########
@@ -99,25 +99,41 @@ public Properties getHostProperties() {
     }
 
     /** Returns the client to access the container from outside the docker 
network. */
-    public KinesisAsyncClient getContainerClient() throws URISyntaxException {
-        return getClient(getContainerEndpointUrl());
+    public KinesisAsyncClient getContainerClient(SdkAsyncHttpClient httpClient)
+            throws URISyntaxException {
+        return getClient(getContainerEndpointUrl(), httpClient);
     }
 
-    /** Returns the client to access the host from inside the docker network. 
*/
+    /** Returns the client with the default async http client. */
     public KinesisAsyncClient getHostClient() throws URISyntaxException {
-        return getClient(getHostEndpointUrl());
+        return getHostClient(buildSdkAsyncHttpClient());

Review comment:
       How you intend to close the http client? From what I've seen with 
firehose the wrapping client doesn't seem to close the underlying resource.

##########
File path: 
flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connectors/kinesis/testutils/KinesaliteContainer.java
##########
@@ -99,25 +99,41 @@ public Properties getHostProperties() {
     }
 
     /** Returns the client to access the container from outside the docker 
network. */
-    public KinesisAsyncClient getContainerClient() throws URISyntaxException {
-        return getClient(getContainerEndpointUrl());
+    public KinesisAsyncClient getContainerClient(SdkAsyncHttpClient httpClient)
+            throws URISyntaxException {
+        return getClient(getContainerEndpointUrl(), httpClient);
     }
 
-    /** Returns the client to access the host from inside the docker network. 
*/
+    /** Returns the client with the default async http client. */
     public KinesisAsyncClient getHostClient() throws URISyntaxException {
-        return getClient(getHostEndpointUrl());
+        return getHostClient(buildSdkAsyncHttpClient());
     }
 
-    public KinesisAsyncClient getClient(String endPoint) throws 
URISyntaxException {
+    /** Returns the client to access the host from inside the docker network. 
*/
+    public KinesisAsyncClient getHostClient(SdkAsyncHttpClient httpClient)
+            throws URISyntaxException {
+        return getClient(getHostEndpointUrl(), httpClient);
+    }
+
+    public KinesisAsyncClient getClient(String endPoint, SdkAsyncHttpClient 
httpClient)
+            throws URISyntaxException {
         return KinesisAsyncClient.builder()
                 .endpointOverride(new URI(endPoint))
                 .region(REGION)
                 .credentialsProvider(
                         () -> AwsBasicCredentials.create(getAccessKey(), 
getSecretKey()))
-                .httpClient(buildSdkAsyncHttpClient())
+                .httpClient(httpClient)
                 .build();
     }
 
+    public SdkAsyncHttpClient buildSdkAsyncHttpClient() {

Review comment:
       There is so much duplicated code already, why cant we reuse 
org.apache.flink.connector.aws.testutils.AWSServicesTestUtils#createHttpClient 
here?

##########
File path: 
flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connectors/kinesis/testutils/KinesaliteContainer.java
##########
@@ -99,25 +99,41 @@ public Properties getHostProperties() {
     }
 
     /** Returns the client to access the container from outside the docker 
network. */
-    public KinesisAsyncClient getContainerClient() throws URISyntaxException {
-        return getClient(getContainerEndpointUrl());
+    public KinesisAsyncClient getContainerClient(SdkAsyncHttpClient httpClient)

Review comment:
       I'd suggest renaming the methods to start with a `createXXX` prefix, so 
it's explicit that these need to be closed and EVERY SINGLE CALL to them 
creates a new resource.

##########
File path: 
flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkITCase.java
##########
@@ -84,12 +86,15 @@ public void setUp() throws Exception {
         env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(1);
 
-        kinesisClient = KINESALITE.getHostClient();
+        httpClient = KINESALITE.buildSdkAsyncHttpClient();
+        kinesisClient = KINESALITE.getHostClient(httpClient);
     }
 
     @After
     public void teardown() {
         System.clearProperty(SdkSystemSetting.CBOR_ENABLED.property());
+        httpClient.close();
+        kinesisClient.close();

Review comment:
       -> `org.apache.flink.connector.aws.util.AWSGeneralUtil#closeResources`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to