Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5171#discussion_r158166090
  
    --- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
 ---
    @@ -30,37 +30,44 @@
     import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
     import com.amazonaws.auth.SystemPropertiesCredentialsProvider;
     import com.amazonaws.auth.profile.ProfileCredentialsProvider;
    -import com.amazonaws.regions.Region;
    +import com.amazonaws.client.builder.AwsClientBuilder;
     import com.amazonaws.regions.Regions;
    -import com.amazonaws.services.kinesis.AmazonKinesisClient;
    +import com.amazonaws.services.kinesis.AmazonKinesis;
    +import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
     
     import java.util.Properties;
     
     /**
      * Some utilities specific to Amazon Web Service.
      */
     public class AWSUtil {
    +   /** Used for formatting Flink-specific user agent string when creating 
Kinesis client. */
    +   private static final String USER_AGENT_FORMAT = "Apache Flink %s (%s) 
Kinesis Connector";
     
        /**
    -    * Creates an Amazon Kinesis Client.
    +    * Creates an AmazonKinesis client.
         * @param configProps configuration properties containing the access 
key, secret key, and region
    -    * @return a new Amazon Kinesis Client
    +    * @return a new AmazonKinesis client
         */
    -   public static AmazonKinesisClient createKinesisClient(Properties 
configProps) {
    +   public static AmazonKinesis createKinesisClient(Properties configProps) 
{
                // set a Flink-specific user agent
    -           ClientConfiguration awsClientConfig = new 
ClientConfigurationFactory().getConfig();
    -           awsClientConfig.setUserAgent("Apache Flink " + 
EnvironmentInformation.getVersion() +
    -                   " (" + 
EnvironmentInformation.getRevisionInformation().commitId + ") Kinesis 
Connector");
    +           ClientConfiguration awsClientConfig = new 
ClientConfigurationFactory().getConfig()
    +                           
.withUserAgentPrefix(String.format(USER_AGENT_FORMAT,
    +                                                                           
                                EnvironmentInformation.getVersion(),
    +                                                                           
                                
EnvironmentInformation.getRevisionInformation().commitId));
     
                // utilize automatic refreshment of credentials by directly 
passing the AWSCredentialsProvider
    -           AmazonKinesisClient client = new AmazonKinesisClient(
    -                   AWSUtil.getCredentialsProvider(configProps), 
awsClientConfig);
    +           AmazonKinesisClientBuilder builder = 
AmazonKinesisClientBuilder.standard()
    +                           
.withCredentials(AWSUtil.getCredentialsProvider(configProps))
    +                           .withClientConfiguration(awsClientConfig)
    +                           
.withRegion(Regions.fromName(configProps.getProperty(AWSConfigConstants.AWS_REGION)));
     
    -           
client.setRegion(Region.getRegion(Regions.fromName(configProps.getProperty(AWSConfigConstants.AWS_REGION))));
                if (configProps.containsKey(AWSConfigConstants.AWS_ENDPOINT)) {
    -                   
client.setEndpoint(configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT));
    +                   builder.withEndpointConfiguration(new 
AwsClientBuilder.EndpointConfiguration(
    +                                                                           
                        
configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT),
    +                                                                           
                        
configProps.getProperty(AWSConfigConstants.AWS_REGION)));
    --- End diff --
    
    Why does the endpoint configuration have a region now?
    
    For example, lets say a user wants to test the connector against a local 
Kinesis mock service at "localhost:1111". The user also originally was issuing 
against the regular AWS Kinesis service, at region "us-west-1". The users 
properties would be like -
    ```
    configProps.setProperty(AWSConfigConstants.AWS_REGION, "us-west-1");
    configProps.setProperty(AWSConfigConstants.AWS_ENDPOINT, "localhost:1111");
    ```
    
    In the past, this would correctly redirect requests to "localhost:1111".
    
    With this change, is this also the case? Or do we actually need to call 
`new 
AwsClientBuilder.EndpointConfiguration(configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT),
 null)` instead (do not provide region in endpoint)?


---

Reply via email to