[GitHub] flink pull request #4228: Flink-7035 Automatically identify AWS Region, simp...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4228#discussion_r125445183 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java --- @@ -117,12 +117,77 @@ // // Constructors // + /** +* Creates a new Flink Kinesis Consumer that uses the AWS credentials and region provided from the AWS +* node with the stream initial position to LATEST. +* +* @param stream +* The single AWS Kinesis stream to read from. +* @param deserializer +* The deserializer used to convert raw bytes of Kinesis records to Java objects (without key). +*/ + public FlinkKinesisConsumer(String stream, DeserializationSchema deserializer) { + this(stream, new KinesisDeserializationSchemaWrapper<>(deserializer)); + } + + /** +* Creates a new Flink Kinesis Consumer that uses the AWS credentials and region provided from the AWS +* node with the provided initial position in the stream. +* +* @param stream +* @param deserializer +* @param initialPosition --- End diff -- These params need descriptions for the Javadoc also. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4228: Flink-7035 Automatically identify AWS Region, simp...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4228#discussion_r125445109 --- Diff: docs/dev/connectors/kinesis.md --- @@ -72,12 +72,80 @@ Before consuming data from Kinesis streams, make sure that all streams are creat {% highlight java %} -Properties consumerConfig = new Properties(); -consumerConfig.put(ConsumerConfigConstants.AWS_REGION, "us-east-1"); +StreamExecutionEnvironment env = StreamExecutionEnvironment.getEnvironment(); + +DataStream kinesis = env.addSource(new FlinkKinesisConsumer<>( +"kinesis_stream_name", new SimpleStringSchema(), ConsumerConfigConstants.InitialPosition.LATEST)); +{% endhighlight %} + + +{% highlight scala %} +val env = StreamExecutionEnvironment.getEnvironment + +val kinesis = env.addSource(new FlinkKinesisConsumer[String]( +"kinesis_stream_name", new SimpleStringSchema, ConsumerConfigConstants.InitialPosition.LATEST)) +{% endhighlight %} + + + +The above is a simple example of using the Kinesis consumer when running on an Amazon Linux node (such as in EMR or AWS Lambda). +The AWS APIs automatically provide the authentication credentials and region when available. For unit testing, the ability to +set test configuration is provided using KinesisConfigUtil. + + + +{% highlight java %} +Properties testConfig = new Properties(); +testConfig.put(ConsumerConfigConstants.AWS_REGION, "us-east-1"); +testConfig.put(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); +testConfig.put(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); +testConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); + +KinesisConfigUtil.setDefaultTestProperties(testConfig); + +// Automatically uses testConfig without having to modify job flow +StreamExecutionEnvironment env = StreamExecutionEnvironment.getEnvironment(); +DataStream kinesis = env.addSource(new FlinkKinesisConsumer<>( +"kinesis_stream_name", new SimpleStringSchema())); +{% endhighlight %} + + +{% highlight scala %} +val testConfig = new Properties(); +testConfig.put(ConsumerConfigConstants.AWS_REGION, "us-east-1"); consumerConfig.put(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); consumerConfig.put(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); +KinesisConfigUtil.setDefaultTestProperties(testConfig); + +// Automatically uses testConfig without having to modify job flow +val env = StreamExecutionEnvironment.getEnvironment +val kinesis = env.addSource(new FlinkKinesisConsumer[String]( +"kinesis_stream_name", new SimpleStringSchema)) +{% endhighlight %} + + + +Configuration for the consumer can also be supplied with `java.util.Properties` for use on non-Amazon Linux hardware, +or in the case that other stream consumer properties need to be tuned. + +Please note it is strongly recommended to use Kinesis streams within the same availability zone they originate in. --- End diff -- Thanks for the explanation! That makes a lot of sense. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4228: Flink-7035 Automatically identify AWS Region, simp...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4228#discussion_r125445000 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java --- @@ -117,12 +117,77 @@ // // Constructors // + /** +* Creates a new Flink Kinesis Consumer that uses the AWS credentials and region provided from the AWS +* node with the stream initial position to LATEST. +* +* @param stream +* The single AWS Kinesis stream to read from. +* @param deserializer +* The deserializer used to convert raw bytes of Kinesis records to Java objects (without key). +*/ + public FlinkKinesisConsumer(String stream, DeserializationSchema deserializer) { + this(stream, new KinesisDeserializationSchemaWrapper<>(deserializer)); + } + + /** +* Creates a new Flink Kinesis Consumer that uses the AWS credentials and region provided from the AWS +* node with the provided initial position in the stream. +* +* @param stream +* @param deserializer +* @param initialPosition +*/ + public FlinkKinesisConsumer(String stream, DeserializationSchema deserializer, InitialPosition initialPosition) { + this(stream, new KinesisDeserializationSchemaWrapper<>(deserializer), initialPosition); + } + + /** +* Creates a new Flink Kinesis Consumer that uses the AWS credentials and region provided from the AWS +* node with the stream initial position to LATEST. +* +* @param stream +* The single AWS Kinesis stream to read from. +* @param deserializer +* The deserializer used to convert raw bytes of Kinesis records to Java objects (without key). +*/ + public FlinkKinesisConsumer(String stream, KinesisDeserializationSchema deserializer) { + this(stream, deserializer, getDefaultConfigProperties()); + } + + /** +* Creates a new Flink Kinesis Consumer that uses the AWS credentials and region provided from the AWS +* node with the provided initial position in the stream. +* +* @param stream +* The single AWS Kinesis stream to read from. +* @param deserializer +* The deserializer used to convert raw bytes of Kinesis records to Java objects (without key). +* @param initialPosition +* Where to start the Kinesis stream +*/ + public FlinkKinesisConsumer(String stream, KinesisDeserializationSchema deserializer, InitialPosition initialPosition) { + this(stream, deserializer, getDefaultPropsWithInitialPosition(initialPosition)); + } + + private static Properties getDefaultPropsWithInitialPosition(InitialPosition initialPosition) { + final Properties conf = getDefaultConfigProperties(); + conf.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, initialPosition.name()); --- End diff -- This properties building is problematic. For example, we also have an `InitialPosition.AT_TIMESTAMP` configuration, which involves 2 keys in the properties. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4228: Flink-7035 Automatically identify AWS Region, simp...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4228#discussion_r125445059 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java --- @@ -117,12 +117,77 @@ // // Constructors // + /** --- End diff -- nit: add empty line before /** --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4228: Flink-7035 Automatically identify AWS Region, simp...
Github user mpouttuclarke commented on a diff in the pull request: https://github.com/apache/flink/pull/4228#discussion_r125076287 --- Diff: docs/dev/connectors/kinesis.md --- @@ -72,12 +72,80 @@ Before consuming data from Kinesis streams, make sure that all streams are creat {% highlight java %} -Properties consumerConfig = new Properties(); -consumerConfig.put(ConsumerConfigConstants.AWS_REGION, "us-east-1"); +StreamExecutionEnvironment env = StreamExecutionEnvironment.getEnvironment(); + +DataStream kinesis = env.addSource(new FlinkKinesisConsumer<>( +"kinesis_stream_name", new SimpleStringSchema(), ConsumerConfigConstants.InitialPosition.LATEST)); +{% endhighlight %} + + +{% highlight scala %} +val env = StreamExecutionEnvironment.getEnvironment + +val kinesis = env.addSource(new FlinkKinesisConsumer[String]( +"kinesis_stream_name", new SimpleStringSchema, ConsumerConfigConstants.InitialPosition.LATEST)) +{% endhighlight %} + + + +The above is a simple example of using the Kinesis consumer when running on an Amazon Linux node (such as in EMR or AWS Lambda). +The AWS APIs automatically provide the authentication credentials and region when available. For unit testing, the ability to +set test configuration is provided using KinesisConfigUtil. + + + +{% highlight java %} +Properties testConfig = new Properties(); +testConfig.put(ConsumerConfigConstants.AWS_REGION, "us-east-1"); +testConfig.put(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); +testConfig.put(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); +testConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); + +KinesisConfigUtil.setDefaultTestProperties(testConfig); + +// Automatically uses testConfig without having to modify job flow +StreamExecutionEnvironment env = StreamExecutionEnvironment.getEnvironment(); +DataStream kinesis = env.addSource(new FlinkKinesisConsumer<>( +"kinesis_stream_name", new SimpleStringSchema())); +{% endhighlight %} + + +{% highlight scala %} +val testConfig = new Properties(); +testConfig.put(ConsumerConfigConstants.AWS_REGION, "us-east-1"); consumerConfig.put(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); consumerConfig.put(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); +KinesisConfigUtil.setDefaultTestProperties(testConfig); + +// Automatically uses testConfig without having to modify job flow +val env = StreamExecutionEnvironment.getEnvironment +val kinesis = env.addSource(new FlinkKinesisConsumer[String]( +"kinesis_stream_name", new SimpleStringSchema)) +{% endhighlight %} + + + +Configuration for the consumer can also be supplied with `java.util.Properties` for use on non-Amazon Linux hardware, +or in the case that other stream consumer properties need to be tuned. + +Please note it is strongly recommended to use Kinesis streams within the same availability zone they originate in. --- End diff -- We completely restrict cross-region network traffic except in special circumstances within Amazon because of these reasons. These are simply lessons learned from scaling our systems globally, where situations arise not only due to performance concerns but also regulatory issues such as EU data laws restricting data egress for example. Customer should be aware of these issues and discuss them with support before going down this path. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4228: Flink-7035 Automatically identify AWS Region, simp...
Github user mpouttuclarke commented on a diff in the pull request: https://github.com/apache/flink/pull/4228#discussion_r125074194 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java --- @@ -171,7 +172,14 @@ public static void validateAwsConfiguration(Properties config) { } if (!config.containsKey(AWSConfigConstants.AWS_REGION)) { - throw new IllegalArgumentException("The AWS region ('" + AWSConfigConstants.AWS_REGION + "') must be set in the config."); + final Region currentRegion = Regions.getCurrentRegion(); + if (currentRegion != null) { + config.setProperty(AWSConfigConstants.AWS_REGION, currentRegion.getName()); + } else { + throw new IllegalArgumentException("The AWS region could not be identified automatically from the AWS API. " + --- End diff -- Yes I like that wording. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4228: Flink-7035 Automatically identify AWS Region, simp...
Github user mpouttuclarke commented on a diff in the pull request: https://github.com/apache/flink/pull/4228#discussion_r125073534 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java --- @@ -171,7 +172,14 @@ public static void validateAwsConfiguration(Properties config) { } if (!config.containsKey(AWSConfigConstants.AWS_REGION)) { --- End diff -- The new constructors make the easy path the right path. We go through a lot of trouble at Amazon to make sure that the default constructors do the right thing with the minimal amount of effort. Yet people still set things like region and auth manually when it is not only unnecessary but also a security, performance, and compliance risk. Wherever we can we should try to follow the example of the AWS SDK and provide for using it correctly. Overall, I would make the argument that using property files and statics isn't a best practice. There really should be type safe POJOs and dependency injection in place for configuration of the consumer but that is a larger issue than I can take on right now. The new constructors attempt to add some type safety while improving ease of use when operating in an Amazon environment. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4228: Flink-7035 Automatically identify AWS Region, simp...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4228#discussion_r124967289 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java --- @@ -171,7 +172,14 @@ public static void validateAwsConfiguration(Properties config) { } if (!config.containsKey(AWSConfigConstants.AWS_REGION)) { --- End diff -- I wonder if its actually sufficient to rely on this check, without having the need to have a bunch of the new constructors that you added. I.e., it is sufficient that the user doesn't supply a `AWSConfigConstants.AWS_REGION` value in the props, which we would then default to using `Regions.getCurrentRegion()`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4228: Flink-7035 Automatically identify AWS Region, simp...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4228#discussion_r124967667 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java --- @@ -171,7 +172,14 @@ public static void validateAwsConfiguration(Properties config) { } if (!config.containsKey(AWSConfigConstants.AWS_REGION)) { - throw new IllegalArgumentException("The AWS region ('" + AWSConfigConstants.AWS_REGION + "') must be set in the config."); + final Region currentRegion = Regions.getCurrentRegion(); + if (currentRegion != null) { + config.setProperty(AWSConfigConstants.AWS_REGION, currentRegion.getName()); + } else { + throw new IllegalArgumentException("The AWS region could not be identified automatically from the AWS API. " + --- End diff -- I would rephrase this: >The AWS region could not be identified automatically from the environment. Automatic AWS region retrieval is only available when the consumer is run in AWS environments, which is recommended for production usage to avoid cross-region network traffic. When executed in non-AWS environments, please set the region manually using the property AWSConfigConstants.AWS_REGION. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4228: Flink-7035 Automatically identify AWS Region, simp...
GitHub user mpouttuclarke opened a pull request: https://github.com/apache/flink/pull/4228 Flink-7035 Automatically identify AWS Region, simplified constructors, added test properties support You can merge this pull request into a Git repository by running: $ git pull https://github.com/mpouttuclarke/flink FLINK-7035 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4228.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4228 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---