Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5171#discussion_r158166090 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java --- @@ -30,37 +30,44 @@ import com.amazonaws.auth.EnvironmentVariableCredentialsProvider; import com.amazonaws.auth.SystemPropertiesCredentialsProvider; import com.amazonaws.auth.profile.ProfileCredentialsProvider; -import com.amazonaws.regions.Region; +import com.amazonaws.client.builder.AwsClientBuilder; import com.amazonaws.regions.Regions; -import com.amazonaws.services.kinesis.AmazonKinesisClient; +import com.amazonaws.services.kinesis.AmazonKinesis; +import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder; import java.util.Properties; /** * Some utilities specific to Amazon Web Service. */ public class AWSUtil { + /** Used for formatting Flink-specific user agent string when creating Kinesis client. */ + private static final String USER_AGENT_FORMAT = "Apache Flink %s (%s) Kinesis Connector"; /** - * Creates an Amazon Kinesis Client. + * Creates an AmazonKinesis client. * @param configProps configuration properties containing the access key, secret key, and region - * @return a new Amazon Kinesis Client + * @return a new AmazonKinesis client */ - public static AmazonKinesisClient createKinesisClient(Properties configProps) { + public static AmazonKinesis createKinesisClient(Properties configProps) { // set a Flink-specific user agent - ClientConfiguration awsClientConfig = new ClientConfigurationFactory().getConfig(); - awsClientConfig.setUserAgent("Apache Flink " + EnvironmentInformation.getVersion() + - " (" + EnvironmentInformation.getRevisionInformation().commitId + ") Kinesis Connector"); + ClientConfiguration awsClientConfig = new ClientConfigurationFactory().getConfig() + .withUserAgentPrefix(String.format(USER_AGENT_FORMAT, + EnvironmentInformation.getVersion(), + EnvironmentInformation.getRevisionInformation().commitId)); // utilize automatic refreshment of credentials by directly passing the AWSCredentialsProvider - AmazonKinesisClient client = new AmazonKinesisClient( - AWSUtil.getCredentialsProvider(configProps), awsClientConfig); + AmazonKinesisClientBuilder builder = AmazonKinesisClientBuilder.standard() + .withCredentials(AWSUtil.getCredentialsProvider(configProps)) + .withClientConfiguration(awsClientConfig) + .withRegion(Regions.fromName(configProps.getProperty(AWSConfigConstants.AWS_REGION))); - client.setRegion(Region.getRegion(Regions.fromName(configProps.getProperty(AWSConfigConstants.AWS_REGION)))); if (configProps.containsKey(AWSConfigConstants.AWS_ENDPOINT)) { - client.setEndpoint(configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT)); + builder.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration( + configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT), + configProps.getProperty(AWSConfigConstants.AWS_REGION))); --- End diff -- Why does the endpoint configuration have a region now? For example, lets say a user wants to test the connector against a local Kinesis mock service at "localhost:1111". The user also originally was issuing against the regular AWS Kinesis service, at region "us-west-1". The users properties would be like - ``` configProps.setProperty(AWSConfigConstants.AWS_REGION, "us-west-1"); configProps.setProperty(AWSConfigConstants.AWS_ENDPOINT, "localhost:1111"); ``` In the past, this would correctly redirect requests to "localhost:1111". With this change, is this also the case? Or do we actually need to call `new AwsClientBuilder.EndpointConfiguration(configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT), null)` instead (do not provide region in endpoint)?
---