dannycranmer commented on a change in pull request #18553: URL: https://github.com/apache/flink/pull/18553#discussion_r802466514
########## File path: flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkITCase.java ########## @@ -78,42 +76,36 @@ @Before public void setup() throws Exception { System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false"); - httpClient = AWSServicesTestUtils.createHttpClient(mockFirehoseContainer.getEndpoint()); - s3AsyncClient = createS3Client(mockFirehoseContainer.getEndpoint(), httpClient); - firehoseAsyncClient = getFirehoseClient(mockFirehoseContainer.getEndpoint(), httpClient); - iamAsyncClient = createIamClient(mockFirehoseContainer.getEndpoint(), httpClient); + s3HttpClient = createHttpClient(mockFirehoseContainer.getEndpoint()); + firehoseHttpClient = createHttpClient(mockFirehoseContainer.getEndpoint()); + iamHttpClient = createHttpClient(mockFirehoseContainer.getEndpoint()); + s3AsyncClient = createS3Client(mockFirehoseContainer.getEndpoint(), s3HttpClient); + firehoseAsyncClient = + getFirehoseClient(mockFirehoseContainer.getEndpoint(), firehoseHttpClient); + iamAsyncClient = createIamClient(mockFirehoseContainer.getEndpoint(), iamHttpClient); + env = StreamExecutionEnvironment.getExecutionEnvironment(); Review comment: Why did you split out to separate HTTP clients? ########## File path: flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/testutils/KinesisFirehoseTestUtils.java ########## @@ -73,4 +81,22 @@ public static void createDeliveryStream( firehoseAsyncClient.createDeliveryStream(request); deliveryStream.get(); } + + public static DataStream<String> getSampleDataGenerator( + StreamExecutionEnvironment env, int endValue) { + ObjectMapper mapper = new ObjectMapper(); + return env.fromSequence(1, endValue) + .map(Object::toString) + .returns(String.class) + .map(data -> mapper.writeValueAsString(ImmutableMap.of("data", data))); + } + + public static List<String> getSampleDataGenerated(int endValue) throws JsonProcessingException { Review comment: nit: This method name does not make sense to me, should it be `getSampleData`/`generateSampleData`/`createSampleData` ########## File path: flink-connectors/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSGeneralUtil.java ########## @@ -369,4 +367,35 @@ public static void closeResources(SdkAutoCloseable... resources) { throw exception; } } + + public static void validateWebIdentityTokenFileCredentialsProvider(Properties config) { + validateCredentialProvider(config); + try { + CredentialProvider credentialProviderType = + getCredentialProviderType(config, AWSConfigConstants.AWS_CREDENTIALS_PROVIDER); + if (credentialProviderType.equals(WEB_IDENTITY_TOKEN)) { + getCredentialsProvider(config).resolveCredentials(); Review comment: Where is this code executed? Is it on client or Task Manager? If client, then client will need AWS creds, this is not idea if job is being submitted from a host will limited permissions ########## File path: flink-connectors/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSGeneralUtil.java ########## @@ -369,4 +367,35 @@ public static void closeResources(SdkAutoCloseable... resources) { throw exception; } } + + public static void validateWebIdentityTokenFileCredentialsProvider(Properties config) { + validateCredentialProvider(config); + try { + CredentialProvider credentialProviderType = + getCredentialProviderType(config, AWSConfigConstants.AWS_CREDENTIALS_PROVIDER); + if (credentialProviderType.equals(WEB_IDENTITY_TOKEN)) { + getCredentialsProvider(config).resolveCredentials(); + } + } catch (Throwable e) { Review comment: Can you catch something more specific that Throwable here? Why catch throwable? This is usually considered bad practise ########## File path: flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java ########## @@ -65,6 +70,8 @@ private static SdkAsyncHttpClient createHttpClient(Properties firehoseClientProp private static FirehoseAsyncClient createFirehoseClient( Properties firehoseClientProperties, SdkAsyncHttpClient httpClient) { + AWSGeneralUtil.validateAwsConfiguration(firehoseClientProperties); + AWSGeneralUtil.validateWebIdentityTokenFileCredentialsProvider(firehoseClientProperties); Review comment: Why are we calling an explicit validation for WebIdentifyToken here? Why not a generic top level one? This does not scale if more and more identify providers are added. Firehose should not care explicitly about which one to validate. ########## File path: flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkITCase.java ########## @@ -78,42 +76,36 @@ @Before public void setup() throws Exception { System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false"); - httpClient = AWSServicesTestUtils.createHttpClient(mockFirehoseContainer.getEndpoint()); - s3AsyncClient = createS3Client(mockFirehoseContainer.getEndpoint(), httpClient); - firehoseAsyncClient = getFirehoseClient(mockFirehoseContainer.getEndpoint(), httpClient); - iamAsyncClient = createIamClient(mockFirehoseContainer.getEndpoint(), httpClient); + s3HttpClient = createHttpClient(mockFirehoseContainer.getEndpoint()); + firehoseHttpClient = createHttpClient(mockFirehoseContainer.getEndpoint()); + iamHttpClient = createHttpClient(mockFirehoseContainer.getEndpoint()); + s3AsyncClient = createS3Client(mockFirehoseContainer.getEndpoint(), s3HttpClient); + firehoseAsyncClient = + getFirehoseClient(mockFirehoseContainer.getEndpoint(), firehoseHttpClient); + iamAsyncClient = createIamClient(mockFirehoseContainer.getEndpoint(), iamHttpClient); + env = StreamExecutionEnvironment.getExecutionEnvironment(); } @After public void teardown() { System.clearProperty(SdkSystemSetting.CBOR_ENABLED.property()); - AWSGeneralUtil.closeResources( - httpClient, s3AsyncClient, firehoseAsyncClient, iamAsyncClient); + s3HttpClient.close(); + firehoseHttpClient.close(); + iamHttpClient.close(); + s3AsyncClient.close(); + firehoseAsyncClient.close(); + iamAsyncClient.close(); Review comment: Why not use the util to close these? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org