dannycranmer commented on a change in pull request #18553:
URL: https://github.com/apache/flink/pull/18553#discussion_r802466514



##########
File path: 
flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkITCase.java
##########
@@ -78,42 +76,36 @@
     @Before
     public void setup() throws Exception {
         System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false");
-        httpClient = 
AWSServicesTestUtils.createHttpClient(mockFirehoseContainer.getEndpoint());
-        s3AsyncClient = createS3Client(mockFirehoseContainer.getEndpoint(), 
httpClient);
-        firehoseAsyncClient = 
getFirehoseClient(mockFirehoseContainer.getEndpoint(), httpClient);
-        iamAsyncClient = createIamClient(mockFirehoseContainer.getEndpoint(), 
httpClient);
+        s3HttpClient = createHttpClient(mockFirehoseContainer.getEndpoint());
+        firehoseHttpClient = 
createHttpClient(mockFirehoseContainer.getEndpoint());
+        iamHttpClient = createHttpClient(mockFirehoseContainer.getEndpoint());
+        s3AsyncClient = createS3Client(mockFirehoseContainer.getEndpoint(), 
s3HttpClient);
+        firehoseAsyncClient =
+                getFirehoseClient(mockFirehoseContainer.getEndpoint(), 
firehoseHttpClient);
+        iamAsyncClient = createIamClient(mockFirehoseContainer.getEndpoint(), 
iamHttpClient);
+        env = StreamExecutionEnvironment.getExecutionEnvironment();

Review comment:
       Why did you split out to separate HTTP clients?

##########
File path: 
flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/testutils/KinesisFirehoseTestUtils.java
##########
@@ -73,4 +81,22 @@ public static void createDeliveryStream(
                 firehoseAsyncClient.createDeliveryStream(request);
         deliveryStream.get();
     }
+
+    public static DataStream<String> getSampleDataGenerator(
+            StreamExecutionEnvironment env, int endValue) {
+        ObjectMapper mapper = new ObjectMapper();
+        return env.fromSequence(1, endValue)
+                .map(Object::toString)
+                .returns(String.class)
+                .map(data -> mapper.writeValueAsString(ImmutableMap.of("data", 
data)));
+    }
+
+    public static List<String> getSampleDataGenerated(int endValue) throws 
JsonProcessingException {

Review comment:
       nit: This method name does not make sense to me, should it be 
`getSampleData`/`generateSampleData`/`createSampleData`

##########
File path: 
flink-connectors/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSGeneralUtil.java
##########
@@ -369,4 +367,35 @@ public static void closeResources(SdkAutoCloseable... 
resources) {
             throw exception;
         }
     }
+
+    public static void 
validateWebIdentityTokenFileCredentialsProvider(Properties config) {
+        validateCredentialProvider(config);
+        try {
+            CredentialProvider credentialProviderType =
+                    getCredentialProviderType(config, 
AWSConfigConstants.AWS_CREDENTIALS_PROVIDER);
+            if (credentialProviderType.equals(WEB_IDENTITY_TOKEN)) {
+                getCredentialsProvider(config).resolveCredentials();

Review comment:
       Where is this code executed? Is it on client or Task Manager? If client, 
then client will need AWS creds, this is not idea if job is being submitted 
from a host will limited permissions 

##########
File path: 
flink-connectors/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSGeneralUtil.java
##########
@@ -369,4 +367,35 @@ public static void closeResources(SdkAutoCloseable... 
resources) {
             throw exception;
         }
     }
+
+    public static void 
validateWebIdentityTokenFileCredentialsProvider(Properties config) {
+        validateCredentialProvider(config);
+        try {
+            CredentialProvider credentialProviderType =
+                    getCredentialProviderType(config, 
AWSConfigConstants.AWS_CREDENTIALS_PROVIDER);
+            if (credentialProviderType.equals(WEB_IDENTITY_TOKEN)) {
+                getCredentialsProvider(config).resolveCredentials();
+            }
+        } catch (Throwable e) {

Review comment:
       Can you catch something more specific that Throwable here? Why catch 
throwable? This is usually considered bad practise 

##########
File path: 
flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java
##########
@@ -65,6 +70,8 @@ private static SdkAsyncHttpClient createHttpClient(Properties 
firehoseClientProp
 
     private static FirehoseAsyncClient createFirehoseClient(
             Properties firehoseClientProperties, SdkAsyncHttpClient 
httpClient) {
+        AWSGeneralUtil.validateAwsConfiguration(firehoseClientProperties);
+        
AWSGeneralUtil.validateWebIdentityTokenFileCredentialsProvider(firehoseClientProperties);

Review comment:
       Why are we calling an explicit validation for WebIdentifyToken here? Why 
not a generic top level one? This does not scale if more and more identify 
providers are added. Firehose should not care explicitly about which one to 
validate.

##########
File path: 
flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkITCase.java
##########
@@ -78,42 +76,36 @@
     @Before
     public void setup() throws Exception {
         System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false");
-        httpClient = 
AWSServicesTestUtils.createHttpClient(mockFirehoseContainer.getEndpoint());
-        s3AsyncClient = createS3Client(mockFirehoseContainer.getEndpoint(), 
httpClient);
-        firehoseAsyncClient = 
getFirehoseClient(mockFirehoseContainer.getEndpoint(), httpClient);
-        iamAsyncClient = createIamClient(mockFirehoseContainer.getEndpoint(), 
httpClient);
+        s3HttpClient = createHttpClient(mockFirehoseContainer.getEndpoint());
+        firehoseHttpClient = 
createHttpClient(mockFirehoseContainer.getEndpoint());
+        iamHttpClient = createHttpClient(mockFirehoseContainer.getEndpoint());
+        s3AsyncClient = createS3Client(mockFirehoseContainer.getEndpoint(), 
s3HttpClient);
+        firehoseAsyncClient =
+                getFirehoseClient(mockFirehoseContainer.getEndpoint(), 
firehoseHttpClient);
+        iamAsyncClient = createIamClient(mockFirehoseContainer.getEndpoint(), 
iamHttpClient);
+        env = StreamExecutionEnvironment.getExecutionEnvironment();
     }
 
     @After
     public void teardown() {
         System.clearProperty(SdkSystemSetting.CBOR_ENABLED.property());
-        AWSGeneralUtil.closeResources(
-                httpClient, s3AsyncClient, firehoseAsyncClient, 
iamAsyncClient);
+        s3HttpClient.close();
+        firehoseHttpClient.close();
+        iamHttpClient.close();
+        s3AsyncClient.close();
+        firehoseAsyncClient.close();
+        iamAsyncClient.close();

Review comment:
       Why not use the util to close these?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to