Github user mpouttuclarke commented on a diff in the pull request: https://github.com/apache/flink/pull/4228#discussion_r125076287 --- Diff: docs/dev/connectors/kinesis.md --- @@ -72,12 +72,80 @@ Before consuming data from Kinesis streams, make sure that all streams are creat <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> {% highlight java %} -Properties consumerConfig = new Properties(); -consumerConfig.put(ConsumerConfigConstants.AWS_REGION, "us-east-1"); +StreamExecutionEnvironment env = StreamExecutionEnvironment.getEnvironment(); + +DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>( + "kinesis_stream_name", new SimpleStringSchema(), ConsumerConfigConstants.InitialPosition.LATEST)); +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val env = StreamExecutionEnvironment.getEnvironment + +val kinesis = env.addSource(new FlinkKinesisConsumer[String]( + "kinesis_stream_name", new SimpleStringSchema, ConsumerConfigConstants.InitialPosition.LATEST)) +{% endhighlight %} +</div> +</div> + +The above is a simple example of using the Kinesis consumer when running on an Amazon Linux node (such as in EMR or AWS Lambda). +The AWS APIs automatically provide the authentication credentials and region when available. For unit testing, the ability to +set test configuration is provided using KinesisConfigUtil. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +Properties testConfig = new Properties(); +testConfig.put(ConsumerConfigConstants.AWS_REGION, "us-east-1"); +testConfig.put(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); +testConfig.put(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); +testConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); + +KinesisConfigUtil.setDefaultTestProperties(testConfig); + +// Automatically uses testConfig without having to modify job flow +StreamExecutionEnvironment env = StreamExecutionEnvironment.getEnvironment(); +DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>( + "kinesis_stream_name", new SimpleStringSchema())); +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val testConfig = new Properties(); +testConfig.put(ConsumerConfigConstants.AWS_REGION, "us-east-1"); consumerConfig.put(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); consumerConfig.put(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); +KinesisConfigUtil.setDefaultTestProperties(testConfig); + +// Automatically uses testConfig without having to modify job flow +val env = StreamExecutionEnvironment.getEnvironment +val kinesis = env.addSource(new FlinkKinesisConsumer[String]( + "kinesis_stream_name", new SimpleStringSchema)) +{% endhighlight %} +</div> +</div> + +Configuration for the consumer can also be supplied with `java.util.Properties` for use on non-Amazon Linux hardware, +or in the case that other stream consumer properties need to be tuned. + +Please note it is strongly recommended to use Kinesis streams within the same availability zone they originate in. --- End diff -- We completely restrict cross-region network traffic except in special circumstances within Amazon because of these reasons. These are simply lessons learned from scaling our systems globally, where situations arise not only due to performance concerns but also regulatory issues such as EU data laws restricting data egress for example. Customer should be aware of these issues and discuss them with support before going down this path.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---