Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4228#discussion_r125445109
  
    --- Diff: docs/dev/connectors/kinesis.md ---
    @@ -72,12 +72,80 @@ Before consuming data from Kinesis streams, make sure 
that all streams are creat
     <div class="codetabs" markdown="1">
     <div data-lang="java" markdown="1">
     {% highlight java %}
    -Properties consumerConfig = new Properties();
    -consumerConfig.put(ConsumerConfigConstants.AWS_REGION, "us-east-1");
    +StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getEnvironment();
    +
    +DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
    +    "kinesis_stream_name", new SimpleStringSchema(), 
ConsumerConfigConstants.InitialPosition.LATEST));
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val env = StreamExecutionEnvironment.getEnvironment
    +
    +val kinesis = env.addSource(new FlinkKinesisConsumer[String](
    +    "kinesis_stream_name", new SimpleStringSchema, 
ConsumerConfigConstants.InitialPosition.LATEST))
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +The above is a simple example of using the Kinesis consumer when running 
on an Amazon Linux node (such as in EMR or AWS Lambda).
    +The AWS APIs automatically provide the authentication credentials and 
region when available.  For unit testing, the ability to
    +set test configuration is provided using KinesisConfigUtil.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +Properties testConfig = new Properties();
    +testConfig.put(ConsumerConfigConstants.AWS_REGION, "us-east-1");
    +testConfig.put(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, 
"aws_access_key_id");
    +testConfig.put(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, 
"aws_secret_access_key");
    +testConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
    +
    +KinesisConfigUtil.setDefaultTestProperties(testConfig);
    +
    +// Automatically uses testConfig without having to modify job flow
    +StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getEnvironment();
    +DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
    +    "kinesis_stream_name", new SimpleStringSchema()));
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val testConfig = new Properties();
    +testConfig.put(ConsumerConfigConstants.AWS_REGION, "us-east-1");
     consumerConfig.put(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, 
"aws_access_key_id");
     consumerConfig.put(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, 
"aws_secret_access_key");
     consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, 
"LATEST");
     
    +KinesisConfigUtil.setDefaultTestProperties(testConfig);
    +
    +// Automatically uses testConfig without having to modify job flow
    +val env = StreamExecutionEnvironment.getEnvironment
    +val kinesis = env.addSource(new FlinkKinesisConsumer[String](
    +    "kinesis_stream_name", new SimpleStringSchema))
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +Configuration for the consumer can also be supplied with 
`java.util.Properties` for use on non-Amazon Linux hardware,
    +or in the case that other stream consumer properties need to be tuned.
    +
    +Please note it is strongly recommended to use Kinesis streams within the 
same availability zone they originate in.
    --- End diff --
    
    Thanks for the explanation! That makes a lot of sense.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to